use crate::cache::CacheKey;
use crate::error::{Result, SomaError};
use crate::store::{DataRef, DataStore, StorageConfig};
use crate::value::Value;
use s3::creds::Credentials;
use s3::{Bucket, Region};
use std::path::PathBuf;
pub struct S3DataStore {
config: StorageConfig,
bucket: Box<Bucket>,
prefix: String,
local_cache: PathBuf,
rt: tokio::runtime::Runtime,
}
impl S3DataStore {
pub fn new(
bucket_name: impl Into<String>,
prefix: impl Into<String>,
endpoint: impl Into<String>,
access_key: impl Into<String>,
secret_key: impl Into<String>,
local_cache: impl Into<PathBuf>,
) -> Result<Self> {
let bucket_name = bucket_name.into();
let prefix = prefix.into();
let endpoint = endpoint.into();
let cache_dir = local_cache.into();
std::fs::create_dir_all(&cache_dir).ok();
let region = Region::Custom {
region: String::new(),
endpoint: format!("https://{endpoint}"),
};
let credentials = Credentials::new(
Some(&access_key.into()),
Some(&secret_key.into()),
None,
None,
None,
)
.map_err(|e| SomaError::DataStore(format!("S3 credentials error: {e}")))?;
let bucket = Bucket::new(&bucket_name, region, credentials)
.map_err(|e| SomaError::DataStore(format!("S3 bucket error: {e}")))?
.with_path_style();
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.map_err(|e| SomaError::DataStore(format!("tokio runtime error: {e}")))?;
Ok(Self {
config: StorageConfig::S3 {
bucket: bucket_name,
prefix: prefix.clone(),
region: None,
endpoint: Some(endpoint),
},
bucket,
prefix,
local_cache: cache_dir,
rt,
})
}
pub fn from_env(prefix: impl Into<String>, local_cache: impl Into<PathBuf>) -> Result<Self> {
let bucket_name = std::env::var("BUCKET_NAME")
.map_err(|_| SomaError::DataStore("BUCKET_NAME not set".into()))?;
let endpoint = std::env::var("BUCKET_ENDPOINT")
.map_err(|_| SomaError::DataStore("BUCKET_ENDPOINT not set".into()))?;
let key_id = std::env::var("BUCKET_KEY_ID")
.map_err(|_| SomaError::DataStore("BUCKET_KEY_ID not set".into()))?;
let key_secret = std::env::var("BUCKET_KEY_SECRET")
.map_err(|_| SomaError::DataStore("BUCKET_KEY_SECRET not set".into()))?;
Self::new(
bucket_name,
prefix,
endpoint,
key_id,
key_secret,
local_cache,
)
}
fn s3_key(&self, cache_key: &CacheKey) -> String {
format!("{}{}", self.prefix, cache_key.to_hex())
}
fn local_path(&self, cache_key: &CacheKey) -> PathBuf {
self.local_cache.join(cache_key.to_hex())
}
}
impl DataStore for S3DataStore {
fn put(&self, key: &CacheKey, data: &Value) -> Result<DataRef> {
let bytes = serde_json::to_vec(data)
.map_err(|e| SomaError::DataStore(format!("serialize failed: {e}")))?;
let s3_key = self.s3_key(key);
self.rt
.block_on(self.bucket.put_object(&s3_key, &bytes))
.map_err(|e| SomaError::DataStore(format!("S3 PUT failed: {e}")))?;
std::fs::write(self.local_path(key), &bytes).ok();
Ok(DataRef::S3 {
bucket: self.bucket.name().to_string(),
key: s3_key,
region: None,
})
}
fn get(&self, data_ref: &DataRef) -> Result<Value> {
match data_ref {
DataRef::S3 { key, .. } => {
let hex = key.rsplit('/').next().unwrap_or(key);
let local_path = self.local_cache.join(hex);
if local_path.exists() {
let bytes = std::fs::read(&local_path)
.map_err(|e| SomaError::DataStore(e.to_string()))?;
return serde_json::from_slice(&bytes)
.map_err(|e| SomaError::DataStore(e.to_string()));
}
let response = self
.rt
.block_on(self.bucket.get_object(key))
.map_err(|e| SomaError::DataStore(format!("S3 GET failed: {e}")))?;
let bytes = response.bytes();
std::fs::write(&local_path, bytes).ok();
serde_json::from_slice(bytes)
.map_err(|e| SomaError::DataStore(format!("deserialize failed: {e}")))
}
DataRef::Cached { cache_key } => {
let local_path = self.local_path(cache_key);
if local_path.exists() {
let bytes = std::fs::read(&local_path)
.map_err(|e| SomaError::DataStore(e.to_string()))?;
return serde_json::from_slice(&bytes)
.map_err(|e| SomaError::DataStore(e.to_string()));
}
let s3_key = self.s3_key(cache_key);
let response = self
.rt
.block_on(self.bucket.get_object(&s3_key))
.map_err(|e| SomaError::DataStore(format!("S3 GET failed: {e}")))?;
let bytes = response.bytes();
std::fs::write(&local_path, bytes).ok();
serde_json::from_slice(bytes).map_err(|e| SomaError::DataStore(e.to_string()))
}
DataRef::Inline { value } => Ok(value.clone()),
DataRef::Local { path } => {
let bytes = std::fs::read(path).map_err(|e| SomaError::DataStore(e.to_string()))?;
serde_json::from_slice(&bytes).map_err(|e| SomaError::DataStore(e.to_string()))
}
_ => Err(SomaError::DataStore(
"Unsupported DataRef type for S3DataStore".into(),
)),
}
}
fn exists(&self, data_ref: &DataRef) -> Result<bool> {
match data_ref {
DataRef::S3 { key, .. } => {
let result = self.rt.block_on(self.bucket.head_object(key));
Ok(result.is_ok())
}
DataRef::Cached { cache_key } => {
if self.local_path(cache_key).exists() {
return Ok(true);
}
let s3_key = self.s3_key(cache_key);
Ok(self.rt.block_on(self.bucket.head_object(&s3_key)).is_ok())
}
DataRef::Inline { .. } => Ok(true),
DataRef::Local { path } => Ok(std::path::Path::new(path).exists()),
_ => Ok(false),
}
}
fn remove(&self, data_ref: &DataRef) -> Result<()> {
match data_ref {
DataRef::S3 { key, .. } => {
self.rt
.block_on(self.bucket.delete_object(key))
.map_err(|e| SomaError::DataStore(format!("S3 DELETE failed: {e}")))?;
let hex = key.rsplit('/').next().unwrap_or(key);
let _ = std::fs::remove_file(self.local_cache.join(hex));
}
DataRef::Cached { cache_key } => {
let _ = std::fs::remove_file(self.local_path(cache_key));
let s3_key = self.s3_key(cache_key);
let _ = self.rt.block_on(self.bucket.delete_object(&s3_key));
}
DataRef::Local { path } => {
let _ = std::fs::remove_file(path);
}
_ => {}
}
Ok(())
}
fn config(&self) -> &StorageConfig {
&self.config
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn constructs_from_params() {
let store = S3DataStore::new(
"test-bucket",
"data/",
"s3.eu-central-003.backblazeb2.com",
"fake_key_id",
"fake_key_secret",
std::env::temp_dir().join("soma-s3-test-construct"),
);
assert!(store.is_ok());
}
#[test]
fn s3_key_generation() {
let store = S3DataStore::new(
"b",
"prefix/",
"localhost:9000",
"key",
"secret",
"/tmp/s3test-keygen",
)
.unwrap();
let key = CacheKey::hash_data(b"test");
let s3_key = store.s3_key(&key);
assert!(s3_key.starts_with("prefix/"));
assert_eq!(s3_key.len(), "prefix/".len() + 64);
}
#[test]
fn config_is_s3() {
let store = S3DataStore::new("b", "p/", "localhost", "k", "s", "/tmp/s3test-cfg").unwrap();
assert!(matches!(store.config(), StorageConfig::S3 { .. }));
}
}