use std::collections::BTreeMap;
use std::fs::{self, File, OpenOptions};
use std::io::{BufWriter, Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
use std::sync::RwLock;
use crate::common::I64Set;
use crate::core::{DataType, Error, Result, Schema, SchemaColumn};
use crate::storage::mvcc::persistence::{deserialize_row_version, serialize_row_version};
use crate::storage::mvcc::version_store::RowVersion;
const MAGIC_BYTES: u64 = 0x5354534456534844;
const FOOTER_MAGIC: u64 = 0x53544E445354454E;
const FILE_FORMAT_VERSION: u32 = 3;
const FILE_HEADER_SIZE: usize = 64;
const INDEX_ENTRY_SIZE: usize = 16;
const FOOTER_SIZE: usize = 64;
const DEFAULT_BATCH_SIZE: usize = 1000;
const DEFAULT_BLOCK_SIZE: usize = 64 * 1024;
const ROW_COMPRESSION_THRESHOLD: usize = 64;
const COMPRESSED_LENGTH_FLAG: u32 = 0x8000_0000;
#[allow(dead_code)]
const FEATURE_HAS_COMPRESSION: u32 = 1 << 0;
#[allow(dead_code)]
const FEATURE_HAS_SCHEMA_BLOCK: u32 = 1 << 1;
#[allow(dead_code)]
const FEATURE_HAS_STATISTICS: u32 = 1 << 2;
#[allow(dead_code)]
const FEATURE_HAS_BLOOM_FILTER: u32 = 1 << 3;
#[allow(dead_code)]
const FEATURE_HAS_MIN_MAX_INDEX: u32 = 1 << 4;
#[allow(dead_code)]
const FEATURE_INCREMENTAL_SNAPSHOT: u32 = 1 << 5;
#[derive(Debug, Clone, Copy)]
struct FileHeader {
magic: u64,
version: u32,
feature_flags: u32,
header_size: u32,
creation_time: i64,
source_lsn: u64,
prev_snap_lsn: u64,
schema_version: u32,
compression: u8,
}
impl FileHeader {
fn new() -> Self {
let creation_time = crate::common::time_compat::SystemTime::now()
.duration_since(crate::common::time_compat::UNIX_EPOCH)
.map(|d| d.as_nanos() as i64)
.unwrap_or(0);
Self {
magic: MAGIC_BYTES,
version: FILE_FORMAT_VERSION,
feature_flags: 0,
header_size: FILE_HEADER_SIZE as u32,
creation_time,
source_lsn: 0,
prev_snap_lsn: 0,
schema_version: 0,
compression: 0,
}
}
fn with_source_lsn(mut self, lsn: u64) -> Self {
self.source_lsn = lsn;
self
}
#[allow(dead_code)]
fn with_prev_snap_lsn(mut self, lsn: u64) -> Self {
self.prev_snap_lsn = lsn;
self
}
fn to_bytes(self) -> [u8; FILE_HEADER_SIZE] {
let mut buf = [0u8; FILE_HEADER_SIZE];
buf[0..8].copy_from_slice(&self.magic.to_le_bytes());
buf[8..12].copy_from_slice(&self.version.to_le_bytes());
buf[12..16].copy_from_slice(&self.feature_flags.to_le_bytes());
buf[16..20].copy_from_slice(&self.header_size.to_le_bytes());
buf[20..28].copy_from_slice(&self.creation_time.to_le_bytes());
buf[28..36].copy_from_slice(&self.source_lsn.to_le_bytes());
buf[36..44].copy_from_slice(&self.prev_snap_lsn.to_le_bytes());
buf[44..48].copy_from_slice(&self.schema_version.to_le_bytes());
buf[48] = self.compression;
buf
}
fn from_bytes(data: &[u8]) -> Result<Self> {
if data.len() < 16 {
return Err(Error::internal("header data too short"));
}
let magic = u64::from_le_bytes(data[0..8].try_into().unwrap());
if magic != MAGIC_BYTES {
return Err(Error::internal(format!(
"invalid snapshot file: magic mismatch (expected {:#x}, got {:#x})",
MAGIC_BYTES, magic
)));
}
let version = u32::from_le_bytes(data[8..12].try_into().unwrap());
if version <= 2 {
let flags = u32::from_le_bytes(data[12..16].try_into().unwrap());
return Ok(Self {
magic,
version,
feature_flags: flags,
header_size: 16, creation_time: 0,
source_lsn: 0,
prev_snap_lsn: 0,
schema_version: 0,
compression: 0,
});
}
if data.len() < FILE_HEADER_SIZE {
return Err(Error::internal("header data too short for v3 format"));
}
let feature_flags = u32::from_le_bytes(data[12..16].try_into().unwrap());
let header_size = u32::from_le_bytes(data[16..20].try_into().unwrap());
let creation_time = i64::from_le_bytes(data[20..28].try_into().unwrap());
let source_lsn = u64::from_le_bytes(data[28..36].try_into().unwrap());
let prev_snap_lsn = u64::from_le_bytes(data[36..44].try_into().unwrap());
let schema_version = u32::from_le_bytes(data[44..48].try_into().unwrap());
let compression = data[48];
Ok(Self {
magic,
version,
feature_flags,
header_size,
creation_time,
source_lsn,
prev_snap_lsn,
schema_version,
compression,
})
}
fn effective_header_size(&self) -> usize {
if self.version <= 2 {
16 } else {
self.header_size as usize
}
}
}
#[derive(Debug, Clone, Copy)]
struct Footer {
index_offset: u64,
index_size: u64,
row_count: u64,
txn_ids_offset: u64,
txn_ids_count: u64,
data_checksum: u32,
magic: u64,
}
const LEGACY_FOOTER_SIZE: usize = 48;
impl Footer {
fn to_bytes(self) -> [u8; FOOTER_SIZE] {
let mut buf = [0u8; FOOTER_SIZE];
buf[0..8].copy_from_slice(&self.index_offset.to_le_bytes());
buf[8..16].copy_from_slice(&self.index_size.to_le_bytes());
buf[16..24].copy_from_slice(&self.row_count.to_le_bytes());
buf[24..32].copy_from_slice(&self.txn_ids_offset.to_le_bytes());
buf[32..40].copy_from_slice(&self.txn_ids_count.to_le_bytes());
buf[40..44].copy_from_slice(&self.data_checksum.to_le_bytes());
buf[56..64].copy_from_slice(&self.magic.to_le_bytes());
buf
}
fn from_bytes(data: &[u8], version: u32) -> Result<Self> {
if version <= 2 {
if data.len() < LEGACY_FOOTER_SIZE {
return Err(Error::internal("footer data too short for v2 format"));
}
let magic = u64::from_le_bytes(data[40..48].try_into().unwrap());
if magic != MAGIC_BYTES {
return Err(Error::internal(format!(
"invalid snapshot file: footer magic mismatch (expected {:#x}, got {:#x})",
MAGIC_BYTES, magic
)));
}
return Ok(Self {
index_offset: u64::from_le_bytes(data[0..8].try_into().unwrap()),
index_size: u64::from_le_bytes(data[8..16].try_into().unwrap()),
row_count: u64::from_le_bytes(data[16..24].try_into().unwrap()),
txn_ids_offset: u64::from_le_bytes(data[24..32].try_into().unwrap()),
txn_ids_count: u64::from_le_bytes(data[32..40].try_into().unwrap()),
data_checksum: 0,
magic,
});
}
if data.len() < FOOTER_SIZE {
return Err(Error::internal("footer data too short for v3 format"));
}
let magic = u64::from_le_bytes(data[56..64].try_into().unwrap());
if magic != FOOTER_MAGIC {
return Err(Error::internal(format!(
"invalid snapshot file: footer magic mismatch (expected {:#x}, got {:#x})",
FOOTER_MAGIC, magic
)));
}
Ok(Self {
index_offset: u64::from_le_bytes(data[0..8].try_into().unwrap()),
index_size: u64::from_le_bytes(data[8..16].try_into().unwrap()),
row_count: u64::from_le_bytes(data[16..24].try_into().unwrap()),
txn_ids_offset: u64::from_le_bytes(data[24..32].try_into().unwrap()),
txn_ids_count: u64::from_le_bytes(data[32..40].try_into().unwrap()),
data_checksum: u32::from_le_bytes(data[40..44].try_into().unwrap()),
magic,
})
}
fn size_for_version(version: u32) -> usize {
if version <= 2 {
LEGACY_FOOTER_SIZE
} else {
FOOTER_SIZE
}
}
}
fn serialize_snapshot_schema(schema: &Schema) -> Vec<u8> {
let mut buf = Vec::new();
buf.extend_from_slice(&(schema.table_name.len() as u16).to_le_bytes());
buf.extend_from_slice(schema.table_name.as_bytes());
buf.extend_from_slice(&(schema.columns.len() as u16).to_le_bytes());
for col in &schema.columns {
buf.extend_from_slice(&(col.name.len() as u16).to_le_bytes());
buf.extend_from_slice(col.name.as_bytes());
buf.push(col.data_type.as_u8());
if col.data_type == DataType::Vector {
buf.extend_from_slice(&col.vector_dimensions.to_le_bytes());
}
buf.push(if col.nullable { 1 } else { 0 });
buf.push(if col.primary_key { 1 } else { 0 });
buf.push(if col.auto_increment { 1 } else { 0 });
if let Some(ref default_expr) = col.default_expr {
buf.extend_from_slice(&(default_expr.len() as u16).to_le_bytes());
buf.extend_from_slice(default_expr.as_bytes());
} else {
buf.extend_from_slice(&0u16.to_le_bytes());
}
if let Some(ref check_expr) = col.check_expr {
buf.extend_from_slice(&(check_expr.len() as u16).to_le_bytes());
buf.extend_from_slice(check_expr.as_bytes());
} else {
buf.extend_from_slice(&0u16.to_le_bytes());
}
}
let created_nanos = schema.created_at.timestamp_nanos_opt().unwrap_or(0);
buf.extend_from_slice(&created_nanos.to_le_bytes());
let updated_nanos = schema.updated_at.timestamp_nanos_opt().unwrap_or(0);
buf.extend_from_slice(&updated_nanos.to_le_bytes());
buf.extend_from_slice(&(schema.foreign_keys.len() as u16).to_le_bytes());
for fk in &schema.foreign_keys {
buf.extend_from_slice(&(fk.column_index as u16).to_le_bytes());
buf.extend_from_slice(&(fk.column_name.len() as u16).to_le_bytes());
buf.extend_from_slice(fk.column_name.as_bytes());
buf.extend_from_slice(&(fk.referenced_table.len() as u16).to_le_bytes());
buf.extend_from_slice(fk.referenced_table.as_bytes());
buf.extend_from_slice(&(fk.referenced_column.len() as u16).to_le_bytes());
buf.extend_from_slice(fk.referenced_column.as_bytes());
buf.push(fk.on_delete.as_u8());
buf.push(fk.on_update.as_u8());
}
{
use super::persistence::serialize_value;
buf.extend_from_slice(&(schema.columns.len() as u16).to_le_bytes());
for col in &schema.columns {
if let Some(ref default_value) = col.default_value {
if let Ok(val_bytes) = serialize_value(default_value) {
buf.extend_from_slice(&(val_bytes.len() as u16).to_le_bytes());
buf.extend_from_slice(&val_bytes);
} else {
buf.extend_from_slice(&0u16.to_le_bytes());
}
} else {
buf.extend_from_slice(&0u16.to_le_bytes());
}
}
}
buf
}
fn deserialize_snapshot_schema(data: &[u8]) -> Result<Schema> {
if data.len() < 4 {
return Err(Error::internal("schema data too short"));
}
let mut pos = 0;
if pos + 2 > data.len() {
return Err(Error::internal("invalid schema: missing table name length"));
}
let name_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + name_len > data.len() {
return Err(Error::internal("invalid schema: missing table name"));
}
let table_name = String::from_utf8(data[pos..pos + name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid table name: {}", e)))?;
pos += name_len;
if pos + 2 > data.len() {
return Err(Error::internal("invalid schema: missing column count"));
}
let column_count = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let mut columns = Vec::with_capacity(column_count);
for i in 0..column_count {
if pos + 2 > data.len() {
return Err(Error::internal(format!(
"invalid schema: missing column {} name length",
i
)));
}
let col_name_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + col_name_len > data.len() {
return Err(Error::internal(format!(
"invalid schema: missing column {} name",
i
)));
}
let col_name = String::from_utf8(data[pos..pos + col_name_len].to_vec())
.map_err(|e| Error::internal(format!("invalid column name: {}", e)))?;
pos += col_name_len;
if pos >= data.len() {
return Err(Error::internal(format!(
"invalid schema: missing column {} data type",
i
)));
}
let data_type = DataType::from_u8(data[pos]).unwrap_or(DataType::Null);
pos += 1;
let mut vector_dimensions: u16 = 0;
if data_type == DataType::Vector && pos + 2 <= data.len() {
vector_dimensions = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap());
pos += 2;
}
if pos >= data.len() {
return Err(Error::internal(format!(
"invalid schema: missing column {} nullable",
i
)));
}
let nullable = data[pos] != 0;
pos += 1;
if pos >= data.len() {
return Err(Error::internal(format!(
"invalid schema: missing column {} primary_key",
i
)));
}
let primary_key = data[pos] != 0;
pos += 1;
if pos >= data.len() {
return Err(Error::internal(format!(
"invalid schema: missing column {} auto_increment",
i
)));
}
let auto_increment = data[pos] != 0;
pos += 1;
if pos + 2 > data.len() {
return Err(Error::internal(format!(
"invalid schema: missing column {} default expr length",
i
)));
}
let default_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let default_expr = if default_len > 0 {
if pos + default_len > data.len() {
return Err(Error::internal(format!(
"invalid schema: missing column {} default expr",
i
)));
}
let expr = String::from_utf8(data[pos..pos + default_len].to_vec())
.map_err(|e| Error::internal(format!("invalid default expr: {}", e)))?;
pos += default_len;
Some(expr)
} else {
None
};
if pos + 2 > data.len() {
return Err(Error::internal(format!(
"invalid schema: missing column {} check expr length",
i
)));
}
let check_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
let check_expr = if check_len > 0 {
if pos + check_len > data.len() {
return Err(Error::internal(format!(
"invalid schema: missing column {} check expr",
i
)));
}
let expr = String::from_utf8(data[pos..pos + check_len].to_vec())
.map_err(|e| Error::internal(format!("invalid check expr: {}", e)))?;
pos += check_len;
Some(expr)
} else {
None
};
let name_lower = col_name.to_lowercase();
columns.push(SchemaColumn {
id: i,
name: col_name,
name_lower,
data_type,
nullable,
primary_key,
auto_increment,
default_expr,
default_value: None,
check_expr,
vector_dimensions,
});
}
let created_at = if pos + 8 <= data.len() {
let nanos = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
pos += 8;
if nanos > 0 {
chrono::DateTime::from_timestamp_nanos(nanos)
} else {
chrono::Utc::now()
}
} else {
chrono::Utc::now()
};
let updated_at = if pos + 8 <= data.len() {
let nanos = i64::from_le_bytes(data[pos..pos + 8].try_into().unwrap());
pos += 8;
if nanos > 0 {
chrono::DateTime::from_timestamp_nanos(nanos)
} else {
chrono::Utc::now()
}
} else {
chrono::Utc::now()
};
let mut foreign_keys = Vec::new();
if pos + 2 <= data.len() {
let fk_count = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
for _ in 0..fk_count {
if pos + 2 > data.len() {
return Err(crate::core::Error::internal(
"corrupted schema: truncated foreign key constraint data",
));
}
let col_idx = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + 2 > data.len() {
return Err(crate::core::Error::internal(
"corrupted schema: truncated foreign key constraint data",
));
}
let col_name_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + col_name_len > data.len() {
return Err(crate::core::Error::internal(
"corrupted schema: truncated foreign key constraint data",
));
}
let col_name =
String::from_utf8(data[pos..pos + col_name_len].to_vec()).unwrap_or_default();
pos += col_name_len;
if pos + 2 > data.len() {
return Err(crate::core::Error::internal(
"corrupted schema: truncated foreign key constraint data",
));
}
let ref_table_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + ref_table_len > data.len() {
return Err(crate::core::Error::internal(
"corrupted schema: truncated foreign key constraint data",
));
}
let ref_table =
String::from_utf8(data[pos..pos + ref_table_len].to_vec()).unwrap_or_default();
pos += ref_table_len;
if pos + 2 > data.len() {
return Err(crate::core::Error::internal(
"corrupted schema: truncated foreign key constraint data",
));
}
let ref_col_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if pos + ref_col_len > data.len() {
return Err(crate::core::Error::internal(
"corrupted schema: truncated foreign key constraint data",
));
}
let ref_col =
String::from_utf8(data[pos..pos + ref_col_len].to_vec()).unwrap_or_default();
pos += ref_col_len;
if pos + 2 > data.len() {
return Err(crate::core::Error::internal(
"corrupted schema: truncated foreign key constraint data",
));
}
let on_delete = crate::core::ForeignKeyAction::from_u8(data[pos])
.unwrap_or(crate::core::ForeignKeyAction::Restrict);
pos += 1;
let on_update = crate::core::ForeignKeyAction::from_u8(data[pos])
.unwrap_or(crate::core::ForeignKeyAction::Restrict);
pos += 1;
foreign_keys.push(crate::core::ForeignKeyConstraint {
column_index: col_idx,
column_name: col_name,
referenced_table: ref_table,
referenced_column: ref_col,
on_delete,
on_update,
});
}
}
if pos + 2 <= data.len() {
let dv_count = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
for i in 0..dv_count.min(columns.len()) {
if pos + 2 > data.len() {
break;
}
let val_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
pos += 2;
if val_len > 0 && pos + val_len <= data.len() {
use super::persistence::deserialize_value;
columns[i].default_value = deserialize_value(&data[pos..pos + val_len]).ok();
pos += val_len;
}
}
}
Ok(Schema::with_timestamps_and_foreign_keys(
table_name,
columns,
foreign_keys,
created_at,
updated_at,
))
}
pub struct SnapshotWriter {
file: BufWriter<File>,
file_path: PathBuf,
data_offset: u64,
row_count: u64,
row_index: BTreeMap<i64, u64>,
committed_txn_ids: BTreeMap<i64, i64>,
failed: bool,
source_lsn: u64,
data_hasher: crc32fast::Hasher,
file_hasher: crc32fast::Hasher,
row_data_start: u64,
}
impl SnapshotWriter {
pub fn new(file_path: impl AsRef<Path>) -> Result<Self> {
Self::with_source_lsn(file_path, 0)
}
pub fn with_source_lsn(file_path: impl AsRef<Path>, source_lsn: u64) -> Result<Self> {
let file_path = file_path.as_ref().to_path_buf();
if let Some(parent) = file_path.parent() {
fs::create_dir_all(parent).map_err(|e| {
Error::internal(format!("failed to create snapshot directory: {}", e))
})?;
}
let file = OpenOptions::new()
.create(true)
.truncate(true)
.write(true)
.read(true)
.open(&file_path)
.map_err(|e| Error::internal(format!("failed to create snapshot file: {}", e)))?;
let mut writer = BufWriter::with_capacity(DEFAULT_BLOCK_SIZE, file);
let header = FileHeader::new().with_source_lsn(source_lsn);
let header_bytes = header.to_bytes();
writer
.write_all(&header_bytes)
.map_err(|e| Error::internal(format!("failed to write header: {}", e)))?;
let mut file_hasher = crc32fast::Hasher::new();
file_hasher.update(&header_bytes);
Ok(Self {
file: writer,
file_path,
data_offset: FILE_HEADER_SIZE as u64,
row_count: 0,
row_index: BTreeMap::new(),
committed_txn_ids: BTreeMap::new(),
failed: false,
source_lsn,
data_hasher: crc32fast::Hasher::new(),
file_hasher,
row_data_start: 0, })
}
pub fn fail(&mut self) {
self.failed = true;
}
pub fn source_lsn(&self) -> u64 {
self.source_lsn
}
pub fn write_schema(&mut self, schema: &Schema) -> Result<()> {
let schema_bytes = serialize_snapshot_schema(schema);
let len_bytes = (schema_bytes.len() as u32).to_le_bytes();
self.file
.write_all(&len_bytes)
.map_err(|e| Error::internal(format!("failed to write schema length: {}", e)))?;
self.file
.write_all(&schema_bytes)
.map_err(|e| Error::internal(format!("failed to write schema: {}", e)))?;
self.file_hasher.update(&len_bytes);
self.file_hasher.update(&schema_bytes);
self.data_offset += 4 + schema_bytes.len() as u64;
self.row_data_start = self.data_offset;
Ok(())
}
pub fn append_row(&mut self, row_id: i64, version: &RowVersion) -> Result<()> {
#[cfg(any(test, feature = "test-failpoints"))]
if crate::test_failpoints::SNAPSHOT_WRITE_FAIL.load(std::sync::atomic::Ordering::Acquire) {
return Err(Error::internal("failpoint: snapshot write"));
}
if self.row_index.contains_key(&row_id) {
return Err(Error::internal(format!(
"duplicate row_id in snapshot: {}",
row_id
)));
}
let row_bytes = serialize_row_version(version)?;
self.row_index.insert(row_id, self.data_offset);
self.committed_txn_ids
.insert(version.txn_id, version.create_time);
let (payload, is_compressed) = if row_bytes.len() >= ROW_COMPRESSION_THRESHOLD {
let compressed = lz4_flex::compress_prepend_size(&row_bytes);
if compressed.len() < row_bytes.len() {
(compressed, true)
} else {
(row_bytes, false)
}
} else {
(row_bytes, false)
};
let length_with_flag = if is_compressed {
(payload.len() as u32) | COMPRESSED_LENGTH_FLAG
} else {
payload.len() as u32
};
let len_bytes = length_with_flag.to_le_bytes();
self.file
.write_all(&len_bytes)
.map_err(|e| Error::internal(format!("failed to write row length: {}", e)))?;
self.data_hasher.update(&len_bytes);
self.data_hasher.update(&payload);
self.file_hasher.update(&len_bytes);
self.file_hasher.update(&payload);
self.file
.write_all(&payload)
.map_err(|e| Error::internal(format!("failed to write row: {}", e)))?;
self.data_offset += 4 + payload.len() as u64;
self.row_count += 1;
Ok(())
}
pub fn append_batch(&mut self, versions: &[(i64, RowVersion)]) -> Result<()> {
for (row_id, version) in versions {
self.append_row(*row_id, version)?;
}
Ok(())
}
pub fn finalize(&mut self) -> Result<()> {
#[cfg(any(test, feature = "test-failpoints"))]
if crate::test_failpoints::SNAPSHOT_SYNC_FAIL.load(std::sync::atomic::Ordering::Acquire) {
return Err(Error::internal("failpoint: snapshot sync"));
}
self.file
.flush()
.map_err(|e| Error::internal(format!("failed to flush: {}", e)))?;
let data_checksum = std::mem::take(&mut self.data_hasher).finalize();
let index_offset = self.data_offset;
let mut index_data = Vec::with_capacity(self.row_index.len() * INDEX_ENTRY_SIZE);
for (&row_id, &offset) in &self.row_index {
index_data.extend_from_slice(&row_id.to_le_bytes());
index_data.extend_from_slice(&offset.to_le_bytes());
}
let inner = self.file.get_mut();
inner
.seek(SeekFrom::Start(index_offset))
.map_err(|e| Error::internal(format!("failed to seek to index: {}", e)))?;
inner
.write_all(&index_data)
.map_err(|e| Error::internal(format!("failed to write index: {}", e)))?;
self.file_hasher.update(&index_data);
let txn_ids_offset = index_offset + index_data.len() as u64;
let mut txn_data = Vec::with_capacity(self.committed_txn_ids.len() * 16);
for (&txn_id, ×tamp) in &self.committed_txn_ids {
txn_data.extend_from_slice(&txn_id.to_le_bytes());
txn_data.extend_from_slice(×tamp.to_le_bytes());
}
inner
.write_all(&txn_data)
.map_err(|e| Error::internal(format!("failed to write txn IDs: {}", e)))?;
self.file_hasher.update(&txn_data);
let footer = Footer {
index_offset,
index_size: index_data.len() as u64,
row_count: self.row_count,
txn_ids_offset,
txn_ids_count: self.committed_txn_ids.len() as u64,
data_checksum,
magic: FOOTER_MAGIC,
};
let footer_bytes = footer.to_bytes();
inner
.write_all(&footer_bytes)
.map_err(|e| Error::internal(format!("failed to write footer: {}", e)))?;
self.file_hasher.update(&footer_bytes);
let crc = std::mem::take(&mut self.file_hasher).finalize();
inner
.write_all(&crc.to_le_bytes())
.map_err(|e| Error::internal(format!("failed to write CRC: {}", e)))?;
inner
.sync_all()
.map_err(|e| Error::internal(format!("failed to sync: {}", e)))?;
Ok(())
}
pub fn path(&self) -> &Path {
&self.file_path
}
}
impl Drop for SnapshotWriter {
fn drop(&mut self) {
let _ = self.file.flush();
if self.failed {
let _ = fs::remove_file(&self.file_path);
}
}
}
pub struct SnapshotReader {
file: File,
file_path: PathBuf,
#[allow(dead_code)]
header: FileHeader,
footer: Footer,
schema: Schema,
index: BTreeMap<i64, u64>,
loaded_row_ids: RwLock<I64Set>,
len_buffer: [u8; 4],
}
impl SnapshotReader {
pub fn open(file_path: impl AsRef<Path>) -> Result<Self> {
let file_path = file_path.as_ref().to_path_buf();
let mut file = File::open(&file_path)
.map_err(|e| Error::internal(format!("failed to open snapshot: {}", e)))?;
let file_size = file
.metadata()
.map_err(|e| Error::internal(format!("failed to get file metadata: {}", e)))?
.len();
const MIN_HEADER_SIZE: usize = 16;
if file_size < MIN_HEADER_SIZE as u64 {
return Err(Error::internal("snapshot file too small for header"));
}
let mut min_header_data = [0u8; MIN_HEADER_SIZE];
file.read_exact(&mut min_header_data)
.map_err(|e| Error::internal(format!("failed to read header: {}", e)))?;
let version = u32::from_le_bytes(min_header_data[8..12].try_into().unwrap());
let header = if version <= 2 {
FileHeader::from_bytes(&min_header_data)?
} else {
if file_size < FILE_HEADER_SIZE as u64 {
return Err(Error::internal("snapshot file too small for v3 header"));
}
let mut header_data = [0u8; FILE_HEADER_SIZE];
header_data[..MIN_HEADER_SIZE].copy_from_slice(&min_header_data);
file.read_exact(&mut header_data[MIN_HEADER_SIZE..])
.map_err(|e| Error::internal(format!("failed to read full header: {}", e)))?;
FileHeader::from_bytes(&header_data)?
};
let footer_size = Footer::size_for_version(header.version);
let min_file_size = header.effective_header_size() + footer_size;
if file_size < min_file_size as u64 {
return Err(Error::internal("snapshot file too small"));
}
let has_crc = header.version >= 2;
let footer_offset = if has_crc {
file_size - footer_size as u64 - 4
} else {
file_size - footer_size as u64
};
file.seek(SeekFrom::Start(footer_offset))
.map_err(|e| Error::internal(format!("failed to seek to footer: {}", e)))?;
let mut footer_data = vec![0u8; footer_size];
file.read_exact(&mut footer_data)
.map_err(|e| Error::internal(format!("failed to read footer: {}", e)))?;
let footer = Footer::from_bytes(&footer_data, header.version)?;
if has_crc {
let mut crc_buf = [0u8; 4];
file.read_exact(&mut crc_buf)
.map_err(|e| Error::internal(format!("failed to read CRC32: {}", e)))?;
let stored_crc = u32::from_le_bytes(crc_buf);
file.seek(SeekFrom::Start(0)).map_err(|e| {
Error::internal(format!("failed to seek for CRC verification: {}", e))
})?;
let mut hasher = crc32fast::Hasher::new();
let mut buf = vec![0u8; 64 * 1024]; let data_len = footer_offset + footer_size as u64;
let mut remaining = data_len;
while remaining > 0 {
let to_read = std::cmp::min(remaining as usize, buf.len());
file.read_exact(&mut buf[..to_read]).map_err(|e| {
Error::internal(format!("failed to read for CRC verification: {}", e))
})?;
hasher.update(&buf[..to_read]);
remaining -= to_read as u64;
}
let computed_crc = hasher.finalize();
if stored_crc != computed_crc {
return Err(Error::internal(format!(
"snapshot file CRC32 mismatch: stored={:#x}, computed={:#x}",
stored_crc, computed_crc
)));
}
}
file.seek(SeekFrom::Start(header.effective_header_size() as u64))
.map_err(|e| Error::internal(format!("failed to seek to schema: {}", e)))?;
let mut len_buf = [0u8; 4];
file.read_exact(&mut len_buf)
.map_err(|e| Error::internal(format!("failed to read schema length: {}", e)))?;
let schema_len = u32::from_le_bytes(len_buf) as usize;
let mut schema_data = vec![0u8; schema_len];
file.read_exact(&mut schema_data)
.map_err(|e| Error::internal(format!("failed to read schema data: {}", e)))?;
let schema = deserialize_snapshot_schema(&schema_data)?;
file.seek(SeekFrom::Start(footer.index_offset))
.map_err(|e| Error::internal(format!("failed to seek to index: {}", e)))?;
let num_entries = footer.index_size as usize / INDEX_ENTRY_SIZE;
let mut index_data = vec![0u8; footer.index_size as usize];
file.read_exact(&mut index_data)
.map_err(|e| Error::internal(format!("failed to read index: {}", e)))?;
let mut index = BTreeMap::new();
for i in 0..num_entries {
let offset = i * INDEX_ENTRY_SIZE;
let row_id = i64::from_le_bytes(index_data[offset..offset + 8].try_into().unwrap());
let file_offset =
u64::from_le_bytes(index_data[offset + 8..offset + 16].try_into().unwrap());
index.insert(row_id, file_offset);
}
Ok(Self {
file,
file_path,
header,
footer,
schema,
index,
loaded_row_ids: RwLock::new(I64Set::new()),
len_buffer: [0u8; 4],
})
}
pub fn schema(&self) -> &Schema {
&self.schema
}
pub fn format_version(&self) -> u32 {
self.header.version
}
pub fn row_count(&self) -> u64 {
self.footer.row_count
}
pub fn path(&self) -> &Path {
&self.file_path
}
pub fn source_lsn(&self) -> u64 {
self.header.source_lsn
}
pub fn creation_time(&self) -> i64 {
self.header.creation_time
}
pub fn has_unloaded_row(&self, row_id: i64) -> bool {
if self
.loaded_row_ids
.read()
.map(|s| s.contains(row_id))
.unwrap_or(false)
{
return false; }
self.index.contains_key(&row_id)
}
pub fn row_exists_in_index(&self, row_id: i64) -> bool {
self.index.contains_key(&row_id)
}
#[deprecated(since = "0.1.0", note = "Use has_unloaded_row() for clarity")]
pub fn has_row(&self, row_id: i64) -> bool {
self.has_unloaded_row(row_id)
}
pub fn get_row(&mut self, row_id: i64) -> Option<RowVersion> {
{
let loaded = self.loaded_row_ids.read().ok()?;
if loaded.contains(row_id) {
return None; }
}
let &offset = self.index.get(&row_id)?;
let version = self.read_row_at_offset(offset).ok()?;
if let Ok(mut loaded) = self.loaded_row_ids.write() {
loaded.insert(row_id);
}
Some(version)
}
pub fn get_rows(&mut self, row_ids: &[i64]) -> Vec<(i64, RowVersion)> {
let mut results = Vec::new();
for &row_id in row_ids {
if let Some(version) = self.get_row(row_id) {
results.push((row_id, version));
}
}
results
}
pub fn for_each<F>(&mut self, mut callback: F) -> Result<()>
where
F: FnMut(i64, RowVersion) -> bool,
{
let entries: Vec<(i64, u64)> = self.index.iter().map(|(&k, &v)| (k, v)).collect();
for (row_id, offset) in entries {
match self.read_row_at_offset(offset) {
Ok(version) => {
if !callback(row_id, version) {
break;
}
}
Err(_) => continue,
}
}
Ok(())
}
pub fn get_all_rows(&mut self) -> BTreeMap<i64, RowVersion> {
let mut results = BTreeMap::new();
let entries: Vec<(i64, u64)> = self.index.iter().map(|(&k, &v)| (k, v)).collect();
for (row_id, offset) in entries {
if let Ok(version) = self.read_row_at_offset(offset) {
results.insert(row_id, version);
}
}
results
}
fn read_row_at_offset(&mut self, offset: u64) -> Result<RowVersion> {
use std::io::Read;
self.file
.seek(SeekFrom::Start(offset))
.map_err(|e| Error::internal(format!("failed to seek: {}", e)))?;
self.file
.read_exact(&mut self.len_buffer)
.map_err(|e| Error::internal(format!("failed to read row length: {}", e)))?;
let length_with_flag = u32::from_le_bytes(self.len_buffer);
let is_compressed = (length_with_flag & COMPRESSED_LENGTH_FLAG) != 0;
let row_len = (length_with_flag & !COMPRESSED_LENGTH_FLAG) as usize;
let mut row_data = vec![0u8; row_len];
self.file
.read_exact(&mut row_data)
.map_err(|e| Error::internal(format!("failed to read row data: {}", e)))?;
let row_bytes = if is_compressed {
lz4_flex::decompress_size_prepended(&row_data)
.map_err(|e| Error::internal(format!("failed to decompress row data: {}", e)))?
} else {
row_data
};
deserialize_row_version(&row_bytes)
}
pub fn mark_loaded(&self, row_id: i64) {
if let Ok(mut loaded) = self.loaded_row_ids.write() {
loaded.insert(row_id);
}
}
pub fn is_loaded(&self, row_id: i64) -> bool {
self.loaded_row_ids
.read()
.map(|s| s.contains(row_id))
.unwrap_or(false)
}
pub fn index(&self) -> &BTreeMap<i64, u64> {
&self.index
}
}
pub fn verify_snapshot_integrity(file_path: &Path) -> Result<u64> {
let mut file = File::open(file_path)
.map_err(|e| Error::internal(format!("failed to open snapshot for verification: {}", e)))?;
let file_size = file
.metadata()
.map_err(|e| Error::internal(format!("failed to get file metadata: {}", e)))?
.len();
const MIN_HEADER_SIZE: usize = 16;
if file_size < MIN_HEADER_SIZE as u64 {
return Err(Error::internal("snapshot file too small for header"));
}
let mut min_header_data = [0u8; MIN_HEADER_SIZE];
file.read_exact(&mut min_header_data)
.map_err(|e| Error::internal(format!("failed to read header: {}", e)))?;
let version = u32::from_le_bytes(min_header_data[8..12].try_into().unwrap());
let header = if version <= 2 {
FileHeader::from_bytes(&min_header_data)?
} else {
if file_size < FILE_HEADER_SIZE as u64 {
return Err(Error::internal("snapshot file too small for v3 header"));
}
let mut header_data = [0u8; FILE_HEADER_SIZE];
header_data[..MIN_HEADER_SIZE].copy_from_slice(&min_header_data);
file.read_exact(&mut header_data[MIN_HEADER_SIZE..])
.map_err(|e| Error::internal(format!("failed to read full header: {}", e)))?;
FileHeader::from_bytes(&header_data)?
};
let footer_size = Footer::size_for_version(header.version);
let min_file_size = header.effective_header_size() + footer_size;
if file_size < min_file_size as u64 {
return Err(Error::internal("snapshot file too small"));
}
let has_crc = header.version >= 2;
let footer_offset = if has_crc {
file_size - footer_size as u64 - 4
} else {
file_size - footer_size as u64
};
file.seek(SeekFrom::Start(footer_offset))
.map_err(|e| Error::internal(format!("failed to seek to footer: {}", e)))?;
let mut footer_data = vec![0u8; footer_size];
file.read_exact(&mut footer_data)
.map_err(|e| Error::internal(format!("failed to read footer: {}", e)))?;
let _footer = Footer::from_bytes(&footer_data, header.version)?;
if has_crc {
let mut crc_buf = [0u8; 4];
file.read_exact(&mut crc_buf)
.map_err(|e| Error::internal(format!("failed to read CRC32: {}", e)))?;
let stored_crc = u32::from_le_bytes(crc_buf);
file.seek(SeekFrom::Start(0))
.map_err(|e| Error::internal(format!("failed to seek for CRC verification: {}", e)))?;
let mut hasher = crc32fast::Hasher::new();
let mut buf = vec![0u8; 64 * 1024];
let data_len = footer_offset + footer_size as u64;
let mut remaining = data_len;
while remaining > 0 {
let to_read = std::cmp::min(remaining as usize, buf.len());
file.read_exact(&mut buf[..to_read]).map_err(|e| {
Error::internal(format!("failed to read for CRC verification: {}", e))
})?;
hasher.update(&buf[..to_read]);
remaining -= to_read as u64;
}
let computed_crc = hasher.finalize();
if stored_crc != computed_crc {
return Err(Error::internal(format!(
"snapshot CRC32 mismatch: stored={:#x}, computed={:#x}",
stored_crc, computed_crc
)));
}
}
Ok(header.source_lsn)
}
pub struct DiskVersionStore {
base_dir: PathBuf,
table_name: String,
readers: RwLock<Vec<SnapshotReader>>,
#[allow(dead_code)]
schema_hash: u64,
}
impl DiskVersionStore {
pub fn new(base_dir: impl AsRef<Path>, table_name: &str, schema: &Schema) -> Result<Self> {
let base_dir = base_dir.as_ref().to_path_buf();
let store_dir = base_dir.join(table_name);
fs::create_dir_all(&store_dir).map_err(|e| {
Error::internal(format!(
"failed to create snapshot directory for {}: {}",
table_name, e
))
})?;
Ok(Self {
base_dir,
table_name: table_name.to_string(),
readers: RwLock::new(Vec::new()),
schema_hash: schema_hash(schema),
})
}
pub fn snapshot_dir(&self) -> PathBuf {
self.base_dir.join(&self.table_name)
}
pub fn create_snapshot<F>(&self, row_iterator: F, schema: &Schema) -> Result<PathBuf>
where
F: FnMut(&mut dyn FnMut(i64, &RowVersion) -> bool),
{
let timestamp = chrono::Utc::now().format("%Y%m%d-%H%M%S%.3f");
let file_path = self
.snapshot_dir()
.join(format!("snapshot-{}.bin", timestamp));
let mut writer = SnapshotWriter::new(&file_path)?;
writer.write_schema(schema)?;
let mut row_iterator = row_iterator;
let mut batch: Vec<(i64, RowVersion)> = Vec::with_capacity(DEFAULT_BATCH_SIZE);
row_iterator(&mut |row_id, version| {
if !version.is_deleted() {
let mut snapshot_version = version.clone();
snapshot_version.txn_id = -1;
batch.push((row_id, snapshot_version));
if batch.len() >= DEFAULT_BATCH_SIZE {
if writer.append_batch(&batch).is_err() {
return false;
}
batch.clear();
}
}
true
});
if !batch.is_empty() {
writer.append_batch(&batch)?;
}
writer.finalize()?;
Ok(file_path)
}
pub fn load_snapshots(&self) -> Result<()> {
let snapshot_dir = self.snapshot_dir();
if !snapshot_dir.exists() {
return Ok(()); }
let mut snapshot_files: Vec<PathBuf> = fs::read_dir(&snapshot_dir)
.map_err(|e| Error::internal(format!("failed to read snapshot directory: {}", e)))?
.filter_map(|entry| entry.ok())
.map(|entry| entry.path())
.filter(|path| {
path.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("snapshot-") && n.ends_with(".bin"))
.unwrap_or(false)
})
.collect();
snapshot_files.sort();
if snapshot_files.is_empty() {
return Ok(()); }
let newest = snapshot_files.last().unwrap();
match SnapshotReader::open(newest) {
Ok(reader) => {
let mut readers = self
.readers
.write()
.expect("snapshot readers lock poisoned in load_snapshots");
readers.push(reader);
Ok(())
}
Err(e) => {
for path in snapshot_files.iter().rev().skip(1) {
if let Ok(reader) = SnapshotReader::open(path) {
let mut readers = self
.readers
.write()
.expect("snapshot readers lock poisoned in load_snapshots fallback");
readers.push(reader);
return Ok(());
}
}
Err(e)
}
}
}
pub fn get_row(&self, row_id: i64) -> Option<RowVersion> {
let mut readers = self.readers.write().ok()?;
if readers.is_empty() {
return None;
}
let reader = readers.last_mut()?;
reader.get_row(row_id)
}
pub fn has_unloaded_row(&self, row_id: i64) -> bool {
let readers = self.readers.read().ok();
if let Some(readers) = readers {
if let Some(reader) = readers.last() {
return reader.has_unloaded_row(row_id);
}
}
false
}
#[deprecated(since = "0.1.0", note = "Use has_unloaded_row() for clarity")]
#[allow(deprecated)]
pub fn has_row(&self, row_id: i64) -> bool {
self.has_unloaded_row(row_id)
}
pub fn mark_row_loaded(&self, row_id: i64) {
if let Ok(readers) = self.readers.read() {
if let Some(reader) = readers.last() {
reader.mark_loaded(row_id);
}
}
}
pub fn cleanup_old_snapshots(&self, keep_count: usize) -> Result<()> {
let snapshot_dir = self.snapshot_dir();
if !snapshot_dir.exists() {
return Ok(());
}
let mut snapshot_files: Vec<PathBuf> = fs::read_dir(&snapshot_dir)
.map_err(|e| Error::internal(format!("failed to read snapshot directory: {}", e)))?
.filter_map(|entry| entry.ok())
.map(|entry| entry.path())
.filter(|path| {
path.file_name()
.and_then(|n| n.to_str())
.map(|n| n.starts_with("snapshot-") && n.ends_with(".bin"))
.unwrap_or(false)
})
.collect();
snapshot_files.sort();
snapshot_files.reverse();
let mut valid_kept = 0usize;
let mut surviving_timestamps: Vec<String> = Vec::new();
for snap_file in &snapshot_files {
if valid_kept < keep_count {
if verify_snapshot_integrity(snap_file).is_ok() {
valid_kept += 1;
if let Some(ts) = snap_file
.file_name()
.and_then(|n| n.to_str())
.and_then(|n| n.strip_prefix("snapshot-"))
.and_then(|n| n.strip_suffix(".bin"))
{
surviving_timestamps.push(ts.to_string());
}
continue; }
}
if let Err(e) = fs::remove_file(snap_file) {
eprintln!(
"Warning: failed to delete old snapshot {:?}: {}",
snap_file, e
);
}
let vol_path = snap_file.with_extension("vol");
let marker_path = snap_file.with_extension("vol.promoted");
if marker_path.exists() {
let _ = fs::remove_file(&vol_path);
let _ = fs::remove_file(&marker_path);
}
}
if let Ok(entries) = fs::read_dir(&snapshot_dir) {
for entry in entries.flatten() {
if let Some(name_str) = entry.file_name().to_str().map(|s| s.to_string()) {
if name_str.starts_with("hnsw_") && name_str.ends_with(".bin") {
let should_keep = surviving_timestamps
.iter()
.any(|ts| name_str.ends_with(&format!("-{}.bin", ts)));
if !should_keep {
let _ = fs::remove_file(entry.path());
}
}
}
}
}
Ok(())
}
pub fn close(&self) -> Result<()> {
let mut readers = self
.readers
.write()
.expect("snapshot readers lock poisoned in close");
readers.clear();
Ok(())
}
}
fn schema_hash(schema: &Schema) -> u64 {
let mut hash: u64 = 14695981039346656037;
for c in schema.table_name.chars() {
hash ^= c as u64;
hash = hash.wrapping_mul(1099511628211);
}
for col in &schema.columns {
for c in col.name.chars() {
hash ^= c as u64;
hash = hash.wrapping_mul(1099511628211);
}
hash ^= col.data_type.as_u8() as u64;
hash = hash.wrapping_mul(1099511628211);
if col.nullable {
hash ^= 1;
}
hash = hash.wrapping_mul(1099511628211);
if col.primary_key {
hash ^= 1;
}
hash = hash.wrapping_mul(1099511628211);
}
hash
}
#[cfg(test)]
mod tests {
use super::*;
use crate::core::{Row, Value};
use tempfile::tempdir;
fn create_test_schema() -> Schema {
Schema::new(
"test_table",
vec![
SchemaColumn::with_constraints(
0,
"id",
DataType::Integer,
false,
true,
true,
None,
None,
),
SchemaColumn::with_constraints(
1,
"name",
DataType::Text,
true,
false,
false,
Some("'unnamed'".to_string()),
None,
),
SchemaColumn::with_constraints(
2,
"value",
DataType::Float,
true,
false,
false,
None,
Some("value > 0".to_string()),
),
],
)
}
#[test]
fn test_schema_serialization() {
let schema = create_test_schema();
let data = serialize_snapshot_schema(&schema);
let deserialized = deserialize_snapshot_schema(&data).unwrap();
assert_eq!(deserialized.table_name, schema.table_name);
assert_eq!(deserialized.columns.len(), schema.columns.len());
for (orig, deser) in schema.columns.iter().zip(deserialized.columns.iter()) {
assert_eq!(orig.name, deser.name);
assert_eq!(orig.data_type, deser.data_type);
assert_eq!(orig.nullable, deser.nullable);
assert_eq!(orig.primary_key, deser.primary_key);
assert_eq!(orig.auto_increment, deser.auto_increment);
assert_eq!(orig.default_expr, deser.default_expr);
assert_eq!(orig.check_expr, deser.check_expr);
}
}
#[test]
fn test_snapshot_writer_reader() {
let dir = tempdir().unwrap();
let snapshot_path = dir.path().join("test_snapshot.bin");
let schema = create_test_schema();
{
let mut writer = SnapshotWriter::new(&snapshot_path).unwrap();
writer.write_schema(&schema).unwrap();
for i in 1..=100 {
let version = RowVersion::new(
1,
Row::from_values(vec![
Value::Integer(i),
Value::text(format!("row_{}", i)),
Value::Float(i as f64 * 1.5),
]),
);
writer.append_row(i, &version).unwrap();
}
writer.finalize().unwrap();
}
{
let mut reader = SnapshotReader::open(&snapshot_path).unwrap();
assert_eq!(reader.row_count(), 100);
assert_eq!(reader.schema().table_name, "test_table");
let row = reader.get_row(50).unwrap();
assert_eq!(row.data.len(), 3);
assert!(reader.get_row(50).is_none());
let row = reader.get_row(75).unwrap();
assert_eq!(row.data.len(), 3); }
}
#[test]
fn test_snapshot_for_each() {
let dir = tempdir().unwrap();
let snapshot_path = dir.path().join("foreach_snapshot.bin");
let schema = create_test_schema();
{
let mut writer = SnapshotWriter::new(&snapshot_path).unwrap();
writer.write_schema(&schema).unwrap();
for i in 1..=50 {
let version = RowVersion::new(
1,
Row::from_values(vec![
Value::Integer(i),
Value::text(format!("item_{}", i)),
Value::Float(i as f64),
]),
);
writer.append_row(i, &version).unwrap();
}
writer.finalize().unwrap();
}
{
let mut reader = SnapshotReader::open(&snapshot_path).unwrap();
let mut count = 0;
let mut sum = 0i64;
reader
.for_each(|row_id, version| {
count += 1;
sum += row_id;
assert_eq!(version.data.len(), 3); true
})
.unwrap();
assert_eq!(count, 50);
assert_eq!(sum, (1..=50).sum::<i64>());
}
}
#[test]
fn test_disk_version_store() {
let dir = tempdir().unwrap();
let schema = create_test_schema();
let dvs = DiskVersionStore::new(dir.path(), "test_table", &schema).unwrap();
assert!(dvs.load_snapshots().is_ok());
assert!(dvs.get_row(1).is_none());
}
#[test]
fn test_schema_hash() {
let schema1 = create_test_schema();
let schema2 = create_test_schema();
assert_eq!(schema_hash(&schema1), schema_hash(&schema2));
let mut schema3 = create_test_schema();
schema3.table_name = "different_table".to_string();
assert_ne!(schema_hash(&schema1), schema_hash(&schema3));
}
}