use crate::db::Result;
use std::fs::{File, OpenOptions};
use std::io::{Read, Seek, SeekFrom, Write};
use std::path::{Path, PathBuf};
#[cfg(feature = "object-store")]
use std::sync::Arc;
#[cfg(feature = "object-store")]
use rand::Rng;
#[cfg(feature = "object-store")]
#[derive(Debug, Clone)]
pub struct RetryConfig {
pub max_attempts: u32,
pub base_delay_ms: u64,
pub max_delay_ms: u64,
pub jitter: bool,
}
#[cfg(feature = "object-store")]
impl Default for RetryConfig {
fn default() -> Self {
Self {
max_attempts: 3,
base_delay_ms: 100,
max_delay_ms: 5000,
jitter: true,
}
}
}
#[cfg(feature = "object-store")]
impl RetryConfig {
pub fn none() -> Self {
Self {
max_attempts: 0,
..Default::default()
}
}
pub fn aggressive() -> Self {
Self {
max_attempts: 5,
base_delay_ms: 50,
max_delay_ms: 10000,
jitter: true,
}
}
fn delay_for_attempt(&self, attempt: u32) -> std::time::Duration {
let base_delay = self.base_delay_ms * 2u64.pow(attempt);
let delay = std::cmp::min(base_delay, self.max_delay_ms);
if self.jitter {
let jitter_range = delay / 2; let jitter = rand::thread_rng().gen_range(0..=jitter_range);
std::time::Duration::from_millis(delay + jitter)
} else {
std::time::Duration::from_millis(delay)
}
}
}
#[cfg(feature = "object-store")]
fn is_retryable_error(err: &object_store::Error) -> bool {
use object_store::Error;
match err {
Error::Generic { source, .. } => {
let msg = source.to_string().to_lowercase();
msg.contains("timeout")
|| msg.contains("connection reset")
|| msg.contains("connection refused")
|| msg.contains("temporarily unavailable")
}
Error::NotFound { .. } => false,
Error::AlreadyExists { .. } => false,
Error::Precondition { .. } => false,
Error::NotModified { .. } => false,
Error::InvalidPath { .. } => false,
Error::UnknownConfigurationKey { .. } => false,
_ => false,
}
}
#[cfg(feature = "object-store")]
pub trait Storage: Send + Sync {
fn read_block(&self, path: &Path, offset: u64, size: u32) -> Result<Vec<u8>>;
fn write_sstable(&self, path: &Path, data: &[u8]) -> Result<()>;
fn delete_sstable(&self, path: &Path) -> Result<()>;
fn sync(&self, path: &Path) -> Result<()>;
fn exists(&self, path: &Path) -> Result<bool>;
fn list_sstables(&self, dir: &Path) -> Result<Vec<PathBuf>>;
}
pub struct LocalStorage {
base_path: PathBuf,
}
impl LocalStorage {
#[must_use]
pub const fn new(base_path: PathBuf) -> Self {
Self { base_path }
}
fn full_path(&self, path: &Path) -> PathBuf {
if path.is_absolute() {
path.to_path_buf()
} else {
self.base_path.join(path)
}
}
}
#[cfg(feature = "object-store")]
impl Storage for LocalStorage {
fn read_block(&self, path: &Path, offset: u64, size: u32) -> Result<Vec<u8>> {
let full_path = self.full_path(path);
let mut file = File::open(&full_path)?;
file.seek(SeekFrom::Start(offset))?;
let mut buffer = vec![0u8; size as usize];
file.read_exact(&mut buffer)?;
Ok(buffer)
}
fn write_sstable(&self, path: &Path, data: &[u8]) -> Result<()> {
let full_path = self.full_path(path);
if let Some(parent) = full_path.parent() {
std::fs::create_dir_all(parent)?;
}
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&full_path)?;
file.write_all(data)?;
file.sync_all()?;
Ok(())
}
fn delete_sstable(&self, path: &Path) -> Result<()> {
let full_path = self.full_path(path);
std::fs::remove_file(full_path)?;
Ok(())
}
fn sync(&self, path: &Path) -> Result<()> {
let full_path = self.full_path(path);
let file = OpenOptions::new().write(true).open(full_path)?;
file.sync_all()?;
Ok(())
}
fn exists(&self, path: &Path) -> Result<bool> {
let full_path = self.full_path(path);
Ok(full_path.exists())
}
fn list_sstables(&self, dir: &Path) -> Result<Vec<PathBuf>> {
let full_dir = self.full_path(dir);
let mut sstables = Vec::new();
if full_dir.exists() && full_dir.is_dir() {
for entry in std::fs::read_dir(&full_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("sst") {
sstables.push(path);
}
}
}
Ok(sstables)
}
}
#[cfg(not(feature = "object-store"))]
impl LocalStorage {
pub fn read_block(&self, path: &Path, offset: u64, size: u32) -> Result<Vec<u8>> {
let full_path = self.full_path(path);
let mut file = File::open(&full_path)?;
file.seek(SeekFrom::Start(offset))?;
let mut buffer = vec![0u8; size as usize];
file.read_exact(&mut buffer)?;
Ok(buffer)
}
pub fn write_sstable(&self, path: &Path, data: &[u8]) -> Result<()> {
let full_path = self.full_path(path);
if let Some(parent) = full_path.parent() {
std::fs::create_dir_all(parent)?;
}
let mut file = OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&full_path)?;
file.write_all(data)?;
file.sync_all()?;
Ok(())
}
pub fn delete_sstable(&self, path: &Path) -> Result<()> {
let full_path = self.full_path(path);
std::fs::remove_file(full_path)?;
Ok(())
}
pub fn sync(&self, path: &Path) -> Result<()> {
let full_path = self.full_path(path);
let file = OpenOptions::new().write(true).open(full_path)?;
file.sync_all()?;
Ok(())
}
pub fn exists(&self, path: &Path) -> Result<bool> {
let full_path = self.full_path(path);
Ok(full_path.exists())
}
pub fn list_sstables(&self, dir: &Path) -> Result<Vec<PathBuf>> {
let full_dir = self.full_path(dir);
let mut sstables = Vec::new();
if full_dir.exists() && full_dir.is_dir() {
for entry in std::fs::read_dir(&full_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("sst") {
sstables.push(path);
}
}
}
Ok(sstables)
}
}
#[cfg(feature = "object-store")]
pub struct ObjectStoreBackend {
store: Arc<dyn object_store::ObjectStore>,
runtime: tokio::runtime::Handle,
prefix: String,
retry_config: RetryConfig,
}
#[cfg(feature = "object-store")]
impl ObjectStoreBackend {
pub fn new(store: Arc<dyn object_store::ObjectStore>, prefix: String) -> Self {
Self::with_retry_config(store, prefix, RetryConfig::default())
}
pub fn with_retry_config(
store: Arc<dyn object_store::ObjectStore>,
prefix: String,
retry_config: RetryConfig,
) -> Self {
Self {
store,
runtime: tokio::runtime::Handle::current(),
prefix,
retry_config,
}
}
pub fn s3(bucket: &str, region: &str, endpoint: Option<&str>, prefix: String) -> Result<Self> {
use object_store::aws::AmazonS3Builder;
let mut builder = AmazonS3Builder::new()
.with_bucket_name(bucket)
.with_region(region);
if let Some(ep) = endpoint {
builder = builder.with_endpoint(ep).with_allow_http(true);
}
let store = builder
.build()
.map_err(|e| crate::db::DBError::ObjectStore(e.to_string()))?;
Ok(Self {
store: Arc::new(store),
runtime: tokio::runtime::Handle::current(),
prefix,
retry_config: RetryConfig::default(),
})
}
pub fn gcs(bucket: &str, service_account_path: Option<&Path>, prefix: String) -> Result<Self> {
use object_store::gcp::GoogleCloudStorageBuilder;
let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(bucket);
if let Some(path) = service_account_path {
builder = builder.with_service_account_path(path.to_string_lossy());
}
let store = builder
.build()
.map_err(|e| crate::db::DBError::ObjectStore(e.to_string()))?;
Ok(Self {
store: Arc::new(store),
runtime: tokio::runtime::Handle::current(),
prefix,
retry_config: RetryConfig::default(),
})
}
pub fn azure(container: &str, account: &str, prefix: String) -> Result<Self> {
use object_store::azure::MicrosoftAzureBuilder;
let store = MicrosoftAzureBuilder::new()
.with_container_name(container)
.with_account(account)
.build()
.map_err(|e| crate::db::DBError::ObjectStore(e.to_string()))?;
Ok(Self {
store: Arc::new(store),
runtime: tokio::runtime::Handle::current(),
prefix,
retry_config: RetryConfig::default(),
})
}
fn to_object_path(&self, path: &Path) -> object_store::path::Path {
let path_str = if self.prefix.is_empty() {
path.to_string_lossy().to_string()
} else {
format!("{}/{}", self.prefix.trim_end_matches('/'), path.display())
};
object_store::path::Path::from(path_str)
}
async fn retry_async<F, T, Fut>(&self, operation: F) -> Result<T>
where
F: Fn() -> Fut,
Fut: std::future::Future<Output = std::result::Result<T, object_store::Error>>,
{
let mut attempt = 0;
loop {
match operation().await {
Ok(result) => return Ok(result),
Err(e) => {
if attempt >= self.retry_config.max_attempts || !is_retryable_error(&e) {
return Err(crate::db::DBError::ObjectStore(e.to_string()));
}
let delay = self.retry_config.delay_for_attempt(attempt);
tokio::time::sleep(delay).await;
attempt += 1;
}
}
}
}
}
#[cfg(feature = "object-store")]
impl Storage for ObjectStoreBackend {
fn read_block(&self, path: &Path, offset: u64, size: u32) -> Result<Vec<u8>> {
let object_path = self.to_object_path(path);
let range = offset as usize..(offset as usize + size as usize);
self.runtime.block_on(async {
self.retry_async(|| async {
let bytes = self.store.get_range(&object_path, range.clone()).await?;
Ok(bytes.to_vec())
})
.await
})
}
fn write_sstable(&self, path: &Path, data: &[u8]) -> Result<()> {
let object_path = self.to_object_path(path);
let data = data.to_vec();
self.runtime.block_on(async {
self.retry_async(|| async {
self.store.put(&object_path, data.clone().into()).await?;
Ok(())
})
.await
})
}
fn delete_sstable(&self, path: &Path) -> Result<()> {
let object_path = self.to_object_path(path);
self.runtime
.block_on(async { self.retry_async(|| self.store.delete(&object_path)).await })
}
fn sync(&self, _path: &Path) -> Result<()> {
Ok(())
}
fn exists(&self, path: &Path) -> Result<bool> {
let object_path = self.to_object_path(path);
self.runtime.block_on(async {
match self
.retry_async(|| async { self.store.head(&object_path).await })
.await
{
Ok(_) => Ok(true),
Err(crate::db::DBError::ObjectStore(msg)) if msg.contains("not found") => Ok(false),
Err(e) => Err(e),
}
})
}
fn list_sstables(&self, dir: &Path) -> Result<Vec<PathBuf>> {
let prefix = self.to_object_path(dir);
self.runtime.block_on(async {
use futures::TryStreamExt;
self.retry_async(|| async {
let mut sstables = Vec::new();
let mut stream = self.store.list(Some(&prefix));
while let Some(meta) = stream.try_next().await? {
let path_str = meta.location.to_string();
if path_str.ends_with(".sst") {
sstables.push(PathBuf::from(path_str));
}
}
Ok(sstables)
})
.await
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[test]
fn test_local_storage_write_read() {
let dir = tempdir().unwrap();
let storage = LocalStorage::new(dir.path().to_path_buf());
let path = Path::new("test.sst");
let data = b"hello world";
storage.write_sstable(path, data).unwrap();
let read_data = storage.read_block(path, 0, data.len() as u32).unwrap();
assert_eq!(&read_data, data);
}
#[test]
fn test_local_storage_exists() {
let dir = tempdir().unwrap();
let storage = LocalStorage::new(dir.path().to_path_buf());
let path = Path::new("test.sst");
assert!(!storage.exists(path).unwrap());
storage.write_sstable(path, b"data").unwrap();
assert!(storage.exists(path).unwrap());
}
#[test]
fn test_local_storage_delete() {
let dir = tempdir().unwrap();
let storage = LocalStorage::new(dir.path().to_path_buf());
let path = Path::new("test.sst");
storage.write_sstable(path, b"data").unwrap();
assert!(storage.exists(path).unwrap());
storage.delete_sstable(path).unwrap();
assert!(!storage.exists(path).unwrap());
}
#[test]
fn test_local_storage_list() {
let dir = tempdir().unwrap();
let storage = LocalStorage::new(dir.path().to_path_buf());
storage
.write_sstable(Path::new("L0_001.sst"), b"data1")
.unwrap();
storage
.write_sstable(Path::new("L0_002.sst"), b"data2")
.unwrap();
storage
.write_sstable(Path::new("L1_001.sst"), b"data3")
.unwrap();
let sstables = storage.list_sstables(Path::new(".")).unwrap();
assert_eq!(sstables.len(), 3);
}
#[cfg(feature = "object-store")]
mod object_store_tests {
use super::*;
use object_store::memory::InMemory;
use object_store::ObjectStore as _;
fn create_test_backend() -> (tokio::runtime::Runtime, ObjectStoreBackend) {
let rt = tokio::runtime::Runtime::new().unwrap();
let store = Arc::new(InMemory::new());
let backend = rt.block_on(async {
ObjectStoreBackend {
store,
runtime: tokio::runtime::Handle::current(),
prefix: String::new(),
retry_config: RetryConfig::default(),
}
});
(rt, backend)
}
#[test]
fn test_object_store_backend_write_read() {
let (rt, backend) = create_test_backend();
let _guard = rt.enter();
let path = Path::new("test.sst");
let data = b"hello world from cloud";
backend.write_sstable(path, data).unwrap();
let read_data = backend.read_block(path, 0, data.len() as u32).unwrap();
assert_eq!(&read_data, data);
let partial = backend.read_block(path, 6, 5).unwrap();
assert_eq!(&partial, b"world");
}
#[test]
fn test_object_store_backend_exists() {
let (rt, backend) = create_test_backend();
let _guard = rt.enter();
let path = Path::new("test.sst");
assert!(!backend.exists(path).unwrap());
backend.write_sstable(path, b"data").unwrap();
assert!(backend.exists(path).unwrap());
}
#[test]
fn test_object_store_backend_delete() {
let (rt, backend) = create_test_backend();
let _guard = rt.enter();
let path = Path::new("test.sst");
backend.write_sstable(path, b"data").unwrap();
assert!(backend.exists(path).unwrap());
backend.delete_sstable(path).unwrap();
assert!(!backend.exists(path).unwrap());
}
#[test]
fn test_object_store_backend_list() {
let (rt, backend) = create_test_backend();
let _guard = rt.enter();
backend
.write_sstable(Path::new("L0_001.sst"), b"data1")
.unwrap();
backend
.write_sstable(Path::new("L0_002.sst"), b"data2")
.unwrap();
backend
.write_sstable(Path::new("L1_001.sst"), b"data3")
.unwrap();
let sstables = backend.list_sstables(Path::new("")).unwrap();
assert_eq!(sstables.len(), 3);
}
#[test]
fn test_object_store_backend_with_prefix() {
let rt = tokio::runtime::Runtime::new().unwrap();
let store = Arc::new(InMemory::new());
let backend = rt.block_on(async {
ObjectStoreBackend {
store: store.clone(),
runtime: tokio::runtime::Handle::current(),
prefix: "seerdb/data".to_string(),
retry_config: RetryConfig::default(),
}
});
let _guard = rt.enter();
let path = Path::new("L0_001.sst");
backend.write_sstable(path, b"data").unwrap();
let object_path = object_store::path::Path::from("seerdb/data/L0_001.sst");
let exists = rt.block_on(async { store.head(&object_path).await.is_ok() });
assert!(exists);
let data = backend.read_block(path, 0, 4).unwrap();
assert_eq!(&data, b"data");
}
#[test]
fn test_object_store_backend_sync_is_noop() {
let (rt, backend) = create_test_backend();
let _guard = rt.enter();
let path = Path::new("test.sst");
backend.sync(path).unwrap();
}
}
}