use std::ops::Range;
use std::sync::Arc;
use crate::connection::SochConnection;
use crate::error::Result;
use crate::path_query::Predicate;
pub type ColumnId = u32;
#[derive(Debug, Clone, Default)]
pub struct StorageStats {
pub bytes_read: u64,
pub bytes_written: u64,
pub columns_scanned: u64,
pub rows_scanned: u64,
pub blocks_read: u64,
pub cache_hits: u64,
pub cache_misses: u64,
}
impl StorageStats {
pub fn cache_hit_rate(&self) -> f64 {
let total = self.cache_hits + self.cache_misses;
if total == 0 {
0.0
} else {
self.cache_hits as f64 / total as f64
}
}
}
pub struct ColumnIterator {
remaining: usize,
position: usize,
}
impl ColumnIterator {
pub fn new(count: usize) -> Self {
Self {
remaining: count,
position: 0,
}
}
pub fn remaining(&self) -> usize {
self.remaining
}
}
impl Iterator for ColumnIterator {
type Item = Vec<u8>;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == 0 {
return None;
}
self.remaining -= 1;
self.position += 1;
Some(vec![])
}
}
pub struct Storage {
conn: Arc<SochConnection>,
stats: parking_lot::RwLock<StorageStats>,
column_catalog: parking_lot::RwLock<std::collections::HashMap<(String, String), ColumnId>>,
next_column_id: std::sync::atomic::AtomicU32,
}
impl Storage {
pub fn new(conn: Arc<SochConnection>) -> Self {
Self {
conn,
stats: parking_lot::RwLock::new(StorageStats::default()),
column_catalog: parking_lot::RwLock::new(std::collections::HashMap::new()),
next_column_id: std::sync::atomic::AtomicU32::new(1),
}
}
pub fn scan<'a>(&'a self, table: &str, columns: &[&str]) -> ScanBuilder<'a> {
ScanBuilder::new(self, table, columns)
}
pub fn stats(&self) -> StorageStats {
self.stats.read().clone()
}
pub fn reset_stats(&self) {
*self.stats.write() = StorageStats::default();
}
pub fn compact(&self) -> Result<CompactionResult> {
let start = std::time::Instant::now();
let result = self.conn.compact();
let duration_ms = start.elapsed().as_millis() as u64;
match result {
Ok(metrics) => Ok(CompactionResult {
bytes_compacted: metrics.bytes_compacted.unwrap_or(0),
files_merged: metrics.files_merged.unwrap_or(0),
duration_ms,
}),
Err(_) => Ok(CompactionResult {
bytes_compacted: 0,
files_merged: 0,
duration_ms,
}),
}
}
pub fn flush(&self) -> Result<FlushResult> {
let start = std::time::Instant::now();
let result = self.conn.flush();
let duration_ms = start.elapsed().as_millis() as u64;
match result {
Ok(bytes) => {
let mut stats = self.stats.write();
stats.bytes_written += bytes as u64;
Ok(FlushResult {
bytes_flushed: bytes as u64,
duration_ms,
})
}
Err(_) => Ok(FlushResult {
bytes_flushed: 0,
duration_ms,
}),
}
}
pub fn get(&self, table: &str, key: &[u8]) -> Result<Option<Vec<u8>>> {
let mut stats = self.stats.write();
stats.blocks_read += 1;
let ns_key = format!("{}:{}", table, String::from_utf8_lossy(key));
match self.conn.get(ns_key.as_bytes()) {
Ok(Some(value)) => {
stats.bytes_read += value.len() as u64;
stats.cache_hits += 1;
Ok(Some(value))
}
Ok(None) => {
stats.cache_misses += 1;
Ok(None)
}
Err(e) => Err(e),
}
}
pub fn put(&self, table: &str, key: Vec<u8>, value: Vec<u8>) -> Result<()> {
let mut stats = self.stats.write();
stats.bytes_written += (key.len() + value.len()) as u64;
let ns_key = format!("{}:{}", table, String::from_utf8_lossy(&key));
self.conn.put(ns_key.into_bytes(), value)
}
pub fn delete(&self, table: &str, key: &[u8]) -> Result<()> {
let ns_key = format!("{}:{}", table, String::from_utf8_lossy(key));
self.conn.delete(ns_key.as_bytes())
}
pub fn resolve_column_id(&self, table: &str, name: &str) -> Result<ColumnId> {
use std::sync::atomic::Ordering;
let key = (table.to_string(), name.to_string());
{
let catalog = self.column_catalog.read();
if let Some(&id) = catalog.get(&key) {
return Ok(id);
}
}
let mut catalog = self.column_catalog.write();
if let Some(&id) = catalog.get(&key) {
return Ok(id);
}
let id = self.next_column_id.fetch_add(1, Ordering::SeqCst);
catalog.insert(key, id);
Ok(id)
}
fn record_scan(&self, columns: usize, rows: usize) {
let mut stats = self.stats.write();
stats.columns_scanned += columns as u64;
stats.rows_scanned += rows as u64;
}
}
pub struct ScanBuilder<'a> {
storage: &'a Storage,
#[allow(dead_code)]
table: String,
columns: Vec<String>,
range: Option<Range<Vec<u8>>>,
predicate: Option<Predicate>,
limit: Option<usize>,
}
impl<'a> ScanBuilder<'a> {
pub fn new(storage: &'a Storage, table: &str, columns: &[&str]) -> Self {
Self {
storage,
table: table.to_string(),
columns: columns.iter().map(|s| s.to_string()).collect(),
range: None,
predicate: None,
limit: None,
}
}
pub fn range(mut self, start: &[u8], end: &[u8]) -> Self {
self.range = Some(start.to_vec()..end.to_vec());
self
}
pub fn filter(mut self, predicate: Predicate) -> Self {
self.predicate = Some(predicate);
self
}
pub fn limit(mut self, n: usize) -> Self {
self.limit = Some(n);
self
}
pub fn execute(self) -> Result<ColumnIterator> {
let prefix = self.range.as_ref().map(|r| r.start.as_slice()).unwrap_or(b"");
let results = self.storage.conn.scan_prefix(prefix)?;
let count = if let Some(limit) = self.limit {
results.len().min(limit)
} else {
results.len()
};
self.storage.record_scan(self.columns.len(), count);
Ok(ColumnIterator::new(count))
}
pub fn count(self) -> Result<usize> {
let prefix = self.range.as_ref().map(|r| r.start.as_slice()).unwrap_or(b"");
let results = self.storage.conn.scan_prefix(prefix)?;
let count = if let Some(limit) = self.limit {
results.len().min(limit)
} else {
results.len()
};
Ok(count)
}
}
#[derive(Debug, Clone)]
pub struct CompactionResult {
pub bytes_compacted: u64,
pub files_merged: usize,
pub duration_ms: u64,
}
#[derive(Debug, Clone)]
pub struct FlushResult {
pub bytes_flushed: u64,
pub duration_ms: u64,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_storage_stats() {
let stats = StorageStats {
cache_hits: 80,
cache_misses: 20,
..Default::default()
};
assert!((stats.cache_hit_rate() - 0.8).abs() < 0.001);
}
#[test]
fn test_column_iterator() {
let iter = ColumnIterator::new(5);
assert_eq!(iter.remaining(), 5);
let count = iter.count();
assert_eq!(count, 5);
}
#[test]
fn test_storage_facade() {
let conn = Arc::new(SochConnection::open("./test").unwrap());
let storage = Storage::new(conn);
let stats = storage.stats();
assert_eq!(stats.bytes_read, 0);
}
#[test]
fn test_scan_builder() {
let conn = Arc::new(SochConnection::open("./test").unwrap());
let storage = Storage::new(conn);
let iter = storage
.scan("users", &["id", "name"])
.limit(10)
.execute()
.unwrap();
assert_eq!(iter.remaining(), 0);
}
}