use std::collections::HashSet;
use std::fs::File;
use std::io::{Read, Seek, SeekFrom};
use std::marker::PhantomData;
use std::ops::Range;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use byteorder::{ByteOrder, LittleEndian};
pub const SOCHDB_MAGIC: u64 = 0x544F4F4E44420001;
pub const EDGE_MAGIC: u32 = 0xED6E0001;
pub const EDGE_SIZE: usize = 128;
pub const HEADER_SIZE: usize = 64;
pub const FOOTER_SIZE: usize = 144;
pub const MIN_FILE_SIZE: u64 = (HEADER_SIZE + EDGE_SIZE + FOOTER_SIZE) as u64;
pub const MAX_FILE_SIZE: u64 = 10 * 1024 * 1024 * 1024;
pub const SUPPORTED_VERSIONS: &[u32] = &[1, 2];
#[derive(Debug, Clone)]
pub enum ValidationError {
FileTooSmall { actual: u64, minimum: u64 },
FileTooLarge { actual: u64, maximum: u64 },
BadMagic { expected: u64, actual: u64 },
UnsupportedVersion { version: u32, supported: Vec<u32> },
ChecksumMismatch {
expected: [u8; 32],
actual: [u8; 32],
},
CorruptedEdge { index: usize, reason: String },
InvalidOffset { offset: u64, max: u64 },
InvalidLength { offset: u64, length: u64, max: u64 },
AlignmentViolation {
offset: u64,
required_alignment: usize,
},
OutOfBounds {
offset: usize,
length: usize,
region_size: usize,
},
IoError(String),
TruncatedFile { expected: u64, actual: u64 },
}
impl std::fmt::Display for ValidationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::FileTooSmall { actual, minimum } => {
write!(f, "File too small: {} bytes (minimum: {})", actual, minimum)
}
Self::FileTooLarge { actual, maximum } => {
write!(f, "File too large: {} bytes (maximum: {})", actual, maximum)
}
Self::BadMagic { expected, actual } => {
write!(f, "Bad magic: {:#x} (expected {:#x})", actual, expected)
}
Self::UnsupportedVersion { version, supported } => {
write!(
f,
"Unsupported version: {} (supported: {:?})",
version, supported
)
}
Self::ChecksumMismatch { expected, actual } => {
write!(
f,
"Checksum mismatch: {} vs {}",
hex::encode(expected),
hex::encode(actual)
)
}
Self::CorruptedEdge { index, reason } => {
write!(f, "Corrupted edge at index {}: {}", index, reason)
}
Self::InvalidOffset { offset, max } => {
write!(f, "Invalid offset: {} (max: {})", offset, max)
}
Self::InvalidLength {
offset,
length,
max,
} => {
write!(
f,
"Invalid length: {} at offset {} (max: {})",
length, offset, max
)
}
Self::AlignmentViolation {
offset,
required_alignment,
} => {
write!(
f,
"Alignment violation at {}: required {} byte alignment",
offset, required_alignment
)
}
Self::OutOfBounds {
offset,
length,
region_size,
} => {
write!(
f,
"Out of bounds: [{}..{}] in region of size {}",
offset,
offset + length,
region_size
)
}
Self::IoError(e) => write!(f, "I/O error: {}", e),
Self::TruncatedFile { expected, actual } => {
write!(
f,
"File truncated: expected {} bytes, got {}",
expected, actual
)
}
}
}
}
impl std::error::Error for ValidationError {}
impl From<std::io::Error> for ValidationError {
fn from(e: std::io::Error) -> Self {
ValidationError::IoError(e.to_string())
}
}
#[derive(Debug, Default)]
pub struct ValidationMetrics {
pub files_validated: AtomicU64,
pub validation_failures: AtomicU64,
pub edges_sampled: AtomicU64,
pub corrupted_edges_detected: AtomicU64,
pub bounds_violations: AtomicU64,
pub validation_time_us: AtomicU64,
}
impl ValidationMetrics {
pub fn new() -> Self {
Self::default()
}
pub fn record_validation(&self, success: bool, duration_us: u64) {
self.files_validated.fetch_add(1, Ordering::Relaxed);
if !success {
self.validation_failures.fetch_add(1, Ordering::Relaxed);
}
self.validation_time_us
.fetch_add(duration_us, Ordering::Relaxed);
}
pub fn record_sample(&self, corrupted: bool) {
self.edges_sampled.fetch_add(1, Ordering::Relaxed);
if corrupted {
self.corrupted_edges_detected
.fetch_add(1, Ordering::Relaxed);
}
}
pub fn record_bounds_violation(&self) {
self.bounds_violations.fetch_add(1, Ordering::Relaxed);
}
}
#[derive(Debug, Clone)]
pub struct ValidationConfig {
pub full_checksum: bool,
pub sample_count: usize,
pub max_file_size: u64,
pub check_alignment: bool,
pub required_alignment: usize,
}
impl Default for ValidationConfig {
fn default() -> Self {
Self {
full_checksum: false,
sample_count: 100, max_file_size: MAX_FILE_SIZE,
check_alignment: true,
required_alignment: 8,
}
}
}
impl ValidationConfig {
pub fn high_security() -> Self {
Self {
full_checksum: true,
sample_count: 500,
max_file_size: MAX_FILE_SIZE,
check_alignment: true,
required_alignment: 8,
}
}
pub fn fast() -> Self {
Self {
full_checksum: false,
sample_count: 0,
max_file_size: MAX_FILE_SIZE,
check_alignment: false,
required_alignment: 1,
}
}
pub fn optimal_sample_count(delta: f64, epsilon: f64) -> usize {
((1.0 / delta).ln() / epsilon).ceil() as usize
}
}
pub struct FileValidator {
config: ValidationConfig,
metrics: Arc<ValidationMetrics>,
}
impl FileValidator {
pub fn new(config: ValidationConfig) -> Self {
Self {
config,
metrics: Arc::new(ValidationMetrics::new()),
}
}
pub fn with_metrics(config: ValidationConfig, metrics: Arc<ValidationMetrics>) -> Self {
Self { config, metrics }
}
pub fn metrics(&self) -> &Arc<ValidationMetrics> {
&self.metrics
}
pub fn validate_before_mmap(
&self,
path: &Path,
) -> std::result::Result<FileMetadata, ValidationError> {
let start = std::time::Instant::now();
let result = self.validate_impl(path);
let duration_us = start.elapsed().as_micros() as u64;
self.metrics.record_validation(result.is_ok(), duration_us);
result
}
fn validate_impl(&self, path: &Path) -> std::result::Result<FileMetadata, ValidationError> {
let mut file = File::open(path)?;
let file_size = file.metadata()?.len();
if file_size < MIN_FILE_SIZE {
return Err(ValidationError::FileTooSmall {
actual: file_size,
minimum: MIN_FILE_SIZE,
});
}
if file_size > self.config.max_file_size {
return Err(ValidationError::FileTooLarge {
actual: file_size,
maximum: self.config.max_file_size,
});
}
file.seek(SeekFrom::Start(0))?;
let mut header = [0u8; HEADER_SIZE];
file.read_exact(&mut header)?;
let magic = LittleEndian::read_u64(&header[0..8]);
if magic != SOCHDB_MAGIC {
return Err(ValidationError::BadMagic {
expected: SOCHDB_MAGIC,
actual: magic,
});
}
let version = LittleEndian::read_u32(&header[8..12]);
if !SUPPORTED_VERSIONS.contains(&version) {
return Err(ValidationError::UnsupportedVersion {
version,
supported: SUPPORTED_VERSIONS.to_vec(),
});
}
let num_edges = LittleEndian::read_u64(&header[16..24]);
let data_offset = HEADER_SIZE as u64;
let data_length = num_edges * EDGE_SIZE as u64;
file.seek(SeekFrom::End(-(FOOTER_SIZE as i64)))?;
let mut footer = [0u8; FOOTER_SIZE];
file.read_exact(&mut footer)?;
if self.config.full_checksum {
let expected_checksum: [u8; 32] = footer[0..32].try_into().unwrap();
let actual_checksum =
self.compute_checksum(&mut file, file_size - FOOTER_SIZE as u64)?;
if expected_checksum != actual_checksum {
return Err(ValidationError::ChecksumMismatch {
expected: expected_checksum,
actual: actual_checksum,
});
}
}
if self.config.sample_count > 0 && num_edges > 0 {
self.validate_edge_samples(&mut file, data_offset, num_edges)?;
}
Ok(FileMetadata {
file_size,
version,
num_edges,
data_offset,
data_length,
})
}
fn compute_checksum(
&self,
file: &mut File,
length: u64,
) -> std::result::Result<[u8; 32], ValidationError> {
file.seek(SeekFrom::Start(0))?;
let mut hasher = blake3::Hasher::new();
let mut buffer = vec![0u8; 64 * 1024];
let mut remaining = length;
while remaining > 0 {
let to_read = remaining.min(buffer.len() as u64) as usize;
file.read_exact(&mut buffer[..to_read])?;
hasher.update(&buffer[..to_read]);
remaining -= to_read as u64;
}
Ok(*hasher.finalize().as_bytes())
}
fn validate_edge_samples(
&self,
file: &mut File,
data_offset: u64,
num_edges: u64,
) -> std::result::Result<(), ValidationError> {
let sample_count = self.config.sample_count.min(num_edges as usize);
let mut sampled_indices = HashSet::new();
let mut seed = 0x12345678u64;
let prime = 0x9E3779B97F4A7C15u64;
while sampled_indices.len() < sample_count {
seed = seed.wrapping_mul(prime).wrapping_add(1);
let idx = (seed % num_edges) as usize;
sampled_indices.insert(idx);
}
let mut edge_buffer = [0u8; EDGE_SIZE];
for idx in sampled_indices {
let edge_offset = data_offset + (idx as u64 * EDGE_SIZE as u64);
file.seek(SeekFrom::Start(edge_offset))?;
file.read_exact(&mut edge_buffer)?;
let corrupted = !self.validate_edge(&edge_buffer, idx);
self.metrics.record_sample(corrupted);
if corrupted {
return Err(ValidationError::CorruptedEdge {
index: idx,
reason: "Edge validation failed".to_string(),
});
}
}
Ok(())
}
fn validate_edge(&self, edge_bytes: &[u8; EDGE_SIZE], _index: usize) -> bool {
let edge_magic = LittleEndian::read_u32(&edge_bytes[0..4]);
if edge_magic != EDGE_MAGIC {
return false;
}
let expected_crc = LittleEndian::read_u32(&edge_bytes[EDGE_SIZE - 4..]);
let actual_crc = crc32fast::hash(&edge_bytes[..EDGE_SIZE - 4]);
expected_crc == actual_crc
}
}
#[derive(Debug, Clone)]
pub struct FileMetadata {
pub file_size: u64,
pub version: u32,
pub num_edges: u64,
pub data_offset: u64,
pub data_length: u64,
}
pub struct EdgeRef<'a> {
bytes: &'a [u8; EDGE_SIZE],
_marker: PhantomData<&'a ()>,
}
impl<'a> EdgeRef<'a> {
pub fn new_checked(
data: &'a [u8],
offset: usize,
) -> std::result::Result<Self, ValidationError> {
if offset + EDGE_SIZE > data.len() {
return Err(ValidationError::OutOfBounds {
offset,
length: EDGE_SIZE,
region_size: data.len(),
});
}
let slice = &data[offset..offset + EDGE_SIZE];
let bytes: &[u8; EDGE_SIZE] =
slice
.try_into()
.map_err(|_| ValidationError::InvalidLength {
offset: offset as u64,
length: EDGE_SIZE as u64,
max: data.len() as u64,
})?;
let magic = LittleEndian::read_u32(&bytes[0..4]);
if magic != EDGE_MAGIC {
return Err(ValidationError::CorruptedEdge {
index: offset / EDGE_SIZE,
reason: format!("Bad edge magic: {:#x}", magic),
});
}
Ok(Self {
bytes,
_marker: PhantomData,
})
}
pub unsafe fn new_unchecked(data: &'a [u8], offset: usize) -> Self {
let bytes: &[u8; EDGE_SIZE] = unsafe {
data[offset..offset + EDGE_SIZE]
.try_into()
.unwrap_unchecked()
};
Self {
bytes,
_marker: PhantomData,
}
}
pub fn as_bytes(&self) -> &[u8; EDGE_SIZE] {
self.bytes
}
pub fn source_id(&self) -> u64 {
LittleEndian::read_u64(&self.bytes[4..12])
}
pub fn target_id(&self) -> u64 {
LittleEndian::read_u64(&self.bytes[12..20])
}
pub fn weight(&self) -> f64 {
LittleEndian::read_f64(&self.bytes[20..28])
}
pub fn edge_type(&self) -> u32 {
LittleEndian::read_u32(&self.bytes[28..32])
}
pub fn timestamp(&self) -> u64 {
LittleEndian::read_u64(&self.bytes[32..40])
}
pub fn payload_bytes(&self) -> std::result::Result<&'a [u8], ValidationError> {
let payload_offset = LittleEndian::read_u32(&self.bytes[40..44]) as usize;
let payload_length = LittleEndian::read_u32(&self.bytes[44..48]) as usize;
if payload_offset + payload_length > EDGE_SIZE - 4 {
return Err(ValidationError::InvalidOffset {
offset: payload_offset as u64,
max: (EDGE_SIZE - 4) as u64,
});
}
Ok(&self.bytes[payload_offset..payload_offset + payload_length])
}
pub fn verify_crc(&self) -> bool {
let expected_crc = LittleEndian::read_u32(&self.bytes[EDGE_SIZE - 4..]);
let actual_crc = crc32fast::hash(&self.bytes[..EDGE_SIZE - 4]);
expected_crc == actual_crc
}
}
pub struct ValidatedMmap {
data: Vec<u8>, metadata: FileMetadata,
is_valid: AtomicBool,
metrics: Arc<ValidationMetrics>,
}
impl ValidatedMmap {
pub fn open(
path: &Path,
config: ValidationConfig,
) -> std::result::Result<Self, ValidationError> {
let validator = FileValidator::new(config);
let metadata = validator.validate_before_mmap(path)?;
let mut file = File::open(path)?;
let mut data = Vec::with_capacity(metadata.file_size as usize);
file.read_to_end(&mut data)?;
Ok(Self {
data,
metadata,
is_valid: AtomicBool::new(true),
metrics: validator.metrics,
})
}
pub fn metadata(&self) -> &FileMetadata {
&self.metadata
}
pub fn is_valid(&self) -> bool {
self.is_valid.load(Ordering::Acquire)
}
pub fn get_edge(&self, index: usize) -> std::result::Result<EdgeRef<'_>, ValidationError> {
if !self.is_valid() {
return Err(ValidationError::TruncatedFile {
expected: self.metadata.file_size,
actual: self.data.len() as u64,
});
}
if index >= self.metadata.num_edges as usize {
self.metrics.record_bounds_violation();
return Err(ValidationError::OutOfBounds {
offset: index * EDGE_SIZE + self.metadata.data_offset as usize,
length: EDGE_SIZE,
region_size: self.data.len(),
});
}
let offset = self.metadata.data_offset as usize + index * EDGE_SIZE;
EdgeRef::new_checked(&self.data, offset)
}
pub fn slice(&self, range: Range<usize>) -> std::result::Result<&[u8], ValidationError> {
if !self.is_valid() {
return Err(ValidationError::TruncatedFile {
expected: self.metadata.file_size,
actual: self.data.len() as u64,
});
}
if range.end > self.data.len() {
self.metrics.record_bounds_violation();
return Err(ValidationError::OutOfBounds {
offset: range.start,
length: range.end - range.start,
region_size: self.data.len(),
});
}
Ok(&self.data[range])
}
pub fn iter_edges(&self) -> ValidatedEdgeIterator<'_> {
ValidatedEdgeIterator {
mmap: self,
current_index: 0,
}
}
pub fn num_edges(&self) -> usize {
self.metadata.num_edges as usize
}
pub fn invalidate(&self) {
self.is_valid.store(false, Ordering::Release);
}
pub fn verify_all(&self) -> std::result::Result<usize, ValidationError> {
let mut valid_count = 0;
for i in 0..self.metadata.num_edges as usize {
let edge = self.get_edge(i)?;
if edge.verify_crc() {
valid_count += 1;
}
}
Ok(valid_count)
}
}
pub struct ValidatedEdgeIterator<'a> {
mmap: &'a ValidatedMmap,
current_index: usize,
}
impl<'a> Iterator for ValidatedEdgeIterator<'a> {
type Item = std::result::Result<EdgeRef<'a>, ValidationError>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_index >= self.mmap.num_edges() {
return None;
}
let result = self.mmap.get_edge(self.current_index);
self.current_index += 1;
Some(result)
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.mmap.num_edges() - self.current_index;
(remaining, Some(remaining))
}
}
impl<'a> ExactSizeIterator for ValidatedEdgeIterator<'a> {}
#[inline]
pub fn validate_offset_length(
offset: u64,
length: u64,
max: u64,
) -> std::result::Result<(), ValidationError> {
if offset > max {
return Err(ValidationError::InvalidOffset { offset, max });
}
if offset + length > max {
return Err(ValidationError::InvalidLength {
offset,
length,
max,
});
}
Ok(())
}
#[inline]
pub fn validate_alignment(
offset: u64,
alignment: usize,
) -> std::result::Result<(), ValidationError> {
if !(offset as usize).is_multiple_of(alignment) {
return Err(ValidationError::AlignmentViolation {
offset,
required_alignment: alignment,
});
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
fn create_valid_test_file() -> NamedTempFile {
let mut file = NamedTempFile::new().unwrap();
let mut header = [0u8; HEADER_SIZE];
LittleEndian::write_u64(&mut header[0..8], SOCHDB_MAGIC);
LittleEndian::write_u32(&mut header[8..12], 1); LittleEndian::write_u64(&mut header[16..24], 2); file.write_all(&header).unwrap();
for i in 0..2u64 {
let mut edge = [0u8; EDGE_SIZE];
LittleEndian::write_u32(&mut edge[0..4], EDGE_MAGIC);
LittleEndian::write_u64(&mut edge[4..12], i); LittleEndian::write_u64(&mut edge[12..20], i + 1);
let crc = crc32fast::hash(&edge[..EDGE_SIZE - 4]);
LittleEndian::write_u32(&mut edge[EDGE_SIZE - 4..], crc);
file.write_all(&edge).unwrap();
}
let footer = [0u8; FOOTER_SIZE];
file.write_all(&footer).unwrap();
file.flush().unwrap();
file
}
#[test]
fn test_file_too_small() {
let mut file = NamedTempFile::new().unwrap();
file.write_all(&[0u8; 100]).unwrap();
file.flush().unwrap();
let validator = FileValidator::new(ValidationConfig::default());
let result = validator.validate_before_mmap(file.path());
assert!(matches!(result, Err(ValidationError::FileTooSmall { .. })));
}
#[test]
fn test_bad_magic() {
let mut file = NamedTempFile::new().unwrap();
let mut header = [0u8; HEADER_SIZE];
LittleEndian::write_u64(&mut header[0..8], 0xDEADBEEF);
file.write_all(&header).unwrap();
file.write_all(&vec![0u8; (MIN_FILE_SIZE - HEADER_SIZE as u64) as usize])
.unwrap();
file.flush().unwrap();
let validator = FileValidator::new(ValidationConfig::fast());
let result = validator.validate_before_mmap(file.path());
assert!(matches!(result, Err(ValidationError::BadMagic { .. })));
}
#[test]
fn test_valid_file() {
let file = create_valid_test_file();
let validator = FileValidator::new(ValidationConfig::fast());
let result = validator.validate_before_mmap(file.path());
assert!(result.is_ok());
let metadata = result.unwrap();
assert_eq!(metadata.version, 1);
assert_eq!(metadata.num_edges, 2);
}
#[test]
fn test_edge_ref_bounds_check() {
let file = create_valid_test_file();
let config = ValidationConfig::fast();
let mmap = ValidatedMmap::open(file.path(), config).unwrap();
let edge0 = mmap.get_edge(0);
assert!(edge0.is_ok());
let edge_invalid = mmap.get_edge(100);
assert!(matches!(
edge_invalid,
Err(ValidationError::OutOfBounds { .. })
));
}
#[test]
fn test_edge_ref_crc_verification() {
let file = create_valid_test_file();
let config = ValidationConfig::fast();
let mmap = ValidatedMmap::open(file.path(), config).unwrap();
let edge = mmap.get_edge(0).unwrap();
assert!(edge.verify_crc());
}
#[test]
fn test_validated_iterator() {
let file = create_valid_test_file();
let config = ValidationConfig::fast();
let mmap = ValidatedMmap::open(file.path(), config).unwrap();
let edges: Vec<_> = mmap.iter_edges().collect();
assert_eq!(edges.len(), 2);
assert!(edges.iter().all(|e| e.is_ok()));
}
#[test]
fn test_optimal_sample_count() {
let k = ValidationConfig::optimal_sample_count(0.01, 0.01);
assert!((460..=470).contains(&k));
}
}