use std::env;
use std::fmt::Debug;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use async_trait::async_trait;
use futures::TryStreamExt;
use object_store::aws::AmazonS3Builder;
use object_store::path::Path as ObjectPath;
use object_store::{DynObjectStore, ObjectStore, PutPayload};
use url::Url;
use crate::error::{OmniError, Result};
const FILE_SCHEME_PREFIX: &str = "file://";
const S3_SCHEME_PREFIX: &str = "s3://";
#[async_trait]
pub trait StorageAdapter: Debug + Send + Sync {
async fn read_text(&self, uri: &str) -> Result<String>;
async fn write_text(&self, uri: &str, contents: &str) -> Result<()>;
async fn exists(&self, uri: &str) -> Result<bool>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum StorageKind {
Local,
S3,
}
#[derive(Debug, Default)]
pub struct LocalStorageAdapter;
#[derive(Debug)]
pub struct S3StorageAdapter {
bucket: String,
store: Arc<DynObjectStore>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct S3Location {
bucket: String,
key: String,
}
#[async_trait]
impl StorageAdapter for LocalStorageAdapter {
async fn read_text(&self, uri: &str) -> Result<String> {
let path = local_path_from_uri(uri)?;
Ok(tokio::fs::read_to_string(&path).await?)
}
async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
let path = local_path_from_uri(uri)?;
tokio::fs::write(&path, contents).await?;
Ok(())
}
async fn exists(&self, uri: &str) -> Result<bool> {
Ok(local_path_from_uri(uri)?.exists())
}
}
#[async_trait]
impl StorageAdapter for S3StorageAdapter {
async fn read_text(&self, uri: &str) -> Result<String> {
let location = self.object_path(uri)?;
let bytes = self
.store
.get(&location)
.await
.map_err(|err| storage_backend_error("read", uri, err))?
.bytes()
.await
.map_err(|err| storage_backend_error("read", uri, err))?;
String::from_utf8(bytes.to_vec()).map_err(|err| {
OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
})
}
async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
let location = self.object_path(uri)?;
self.store
.put(&location, PutPayload::from(contents.as_bytes().to_vec()))
.await
.map_err(|err| storage_backend_error("write", uri, err))?;
Ok(())
}
async fn exists(&self, uri: &str) -> Result<bool> {
let location = self.object_path(uri)?;
match self.store.head(&location).await {
Ok(_) => Ok(true),
Err(object_store::Error::NotFound { .. }) => {
let mut entries = self.store.list(Some(&location));
let has_prefix_entries = entries
.try_next()
.await
.map_err(|err| storage_backend_error("exists", uri, err))?
.is_some();
Ok(has_prefix_entries)
}
Err(err) => Err(storage_backend_error("exists", uri, err)),
}
}
}
impl S3StorageAdapter {
fn from_root_uri(root_uri: &str) -> Result<Self> {
let location = parse_s3_uri(root_uri)?;
let mut builder = AmazonS3Builder::from_env().with_bucket_name(&location.bucket);
if let Some(endpoint) = env::var("AWS_ENDPOINT_URL_S3")
.ok()
.or_else(|| env::var("AWS_ENDPOINT_URL").ok())
{
builder = builder.with_endpoint(&endpoint);
if endpoint.starts_with("http://") || env_var_truthy("AWS_ALLOW_HTTP") {
builder = builder.with_allow_http(true);
}
}
if env_var_truthy("AWS_S3_FORCE_PATH_STYLE") {
builder = builder.with_virtual_hosted_style_request(false);
}
let store = builder.build().map_err(|err| {
OmniError::manifest_internal(format!(
"failed to initialize s3 storage for '{}': {}",
root_uri, err
))
})?;
Ok(Self {
bucket: location.bucket,
store: Arc::new(store),
})
}
fn object_path(&self, uri: &str) -> Result<ObjectPath> {
let location = parse_s3_uri(uri)?;
if location.bucket != self.bucket {
return Err(OmniError::manifest_internal(format!(
"s3 storage bucket mismatch for '{}': expected '{}', found '{}'",
uri, self.bucket, location.bucket
)));
}
if location.key.is_empty() {
return Err(OmniError::manifest_internal(format!(
"s3 storage path is empty for '{}'",
uri
)));
}
ObjectPath::parse(&location.key).map_err(|err| {
OmniError::manifest_internal(format!("invalid s3 object path for '{}': {}", uri, err))
})
}
}
pub fn storage_kind_for_uri(uri: &str) -> StorageKind {
if uri.starts_with(S3_SCHEME_PREFIX) {
StorageKind::S3
} else {
StorageKind::Local
}
}
pub fn storage_for_uri(uri: &str) -> Result<Arc<dyn StorageAdapter>> {
match storage_kind_for_uri(uri) {
StorageKind::Local => Ok(Arc::new(LocalStorageAdapter)),
StorageKind::S3 => Ok(Arc::new(S3StorageAdapter::from_root_uri(uri)?)),
}
}
pub fn normalize_root_uri(uri: &str) -> Result<String> {
match storage_kind_for_uri(uri) {
StorageKind::Local => {
let path = local_path_from_uri(uri)?;
Ok(normalize_local_path(&path))
}
StorageKind::S3 => Ok(trim_trailing_slashes(uri)),
}
}
pub fn join_uri(root_uri: &str, relative_path: &str) -> String {
let relative_path = relative_path.trim_start_matches('/');
match storage_kind_for_uri(root_uri) {
StorageKind::S3 => {
let root = trim_trailing_slashes(root_uri);
if root.is_empty() {
relative_path.to_string()
} else {
format!("{}/{}", root, relative_path)
}
}
StorageKind::Local => {
let root = if root_uri.starts_with(FILE_SCHEME_PREFIX) {
local_path_from_file_uri(root_uri)
.map(|path| normalize_local_path(&path))
.unwrap_or_else(|_| trim_trailing_slashes(root_uri))
} else {
normalize_local_path(Path::new(root_uri))
};
let joined = Path::new(&root).join(relative_path);
normalize_local_path(&joined)
}
}
}
fn local_path_from_uri(uri: &str) -> Result<PathBuf> {
if uri.starts_with(FILE_SCHEME_PREFIX) {
return local_path_from_file_uri(uri);
}
Ok(PathBuf::from(uri))
}
fn local_path_from_file_uri(uri: &str) -> Result<PathBuf> {
let url = Url::parse(uri).map_err(|err| {
OmniError::manifest_internal(format!("invalid file uri '{}': {}", uri, err))
})?;
url.to_file_path()
.map_err(|_| OmniError::manifest_internal(format!("invalid file uri '{}'", uri)))
}
fn parse_s3_uri(uri: &str) -> Result<S3Location> {
let url = Url::parse(uri).map_err(|err| {
OmniError::manifest_internal(format!("invalid s3 uri '{}': {}", uri, err))
})?;
if url.scheme() != "s3" {
return Err(OmniError::manifest_internal(format!(
"unsupported s3 uri '{}'",
uri
)));
}
let bucket = url
.host_str()
.ok_or_else(|| OmniError::manifest_internal(format!("missing s3 bucket in '{}'", uri)))?;
Ok(S3Location {
bucket: bucket.to_string(),
key: url.path().trim_start_matches('/').to_string(),
})
}
fn storage_backend_error(action: &str, uri: &str, err: impl std::fmt::Display) -> OmniError {
OmniError::manifest_internal(format!("storage {} failed for '{}': {}", action, uri, err))
}
fn normalize_local_path(path: &Path) -> String {
let raw = path.as_os_str().to_string_lossy();
if raw == "/" {
return raw.to_string();
}
trim_trailing_slashes(&raw)
}
fn trim_trailing_slashes(value: &str) -> String {
let trimmed = value.trim_end_matches('/');
if trimmed.is_empty() {
value.to_string()
} else {
trimmed.to_string()
}
}
fn env_var_truthy(key: &str) -> bool {
matches!(
env::var(key).ok().as_deref(),
Some("1" | "true" | "TRUE" | "True" | "yes" | "YES" | "on" | "ON")
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn storage_backend_selection_is_scheme_aware() {
assert_eq!(storage_kind_for_uri("/tmp/repo"), StorageKind::Local);
assert_eq!(storage_kind_for_uri("file:///tmp/repo"), StorageKind::Local);
assert_eq!(
storage_kind_for_uri("s3://omnigraph-preview/repo"),
StorageKind::S3
);
}
#[test]
fn normalize_root_uri_preserves_local_and_s3_shapes() {
assert_eq!(
normalize_root_uri("/tmp/omnigraph/").unwrap(),
"/tmp/omnigraph"
);
assert_eq!(
normalize_root_uri("file:///tmp/omnigraph/").unwrap(),
"/tmp/omnigraph"
);
assert_eq!(
normalize_root_uri("s3://bucket/prefix/").unwrap(),
"s3://bucket/prefix"
);
}
#[test]
fn join_uri_handles_local_file_and_s3_roots() {
assert_eq!(
join_uri("/tmp/omnigraph", "_schema.pg"),
"/tmp/omnigraph/_schema.pg"
);
assert_eq!(
join_uri("file:///tmp/omnigraph", "_schema.pg"),
"/tmp/omnigraph/_schema.pg"
);
assert_eq!(
join_uri("s3://bucket/prefix", "_schema.pg"),
"s3://bucket/prefix/_schema.pg"
);
}
#[test]
fn parse_s3_uri_splits_bucket_and_key() {
let location = parse_s3_uri("s3://bucket/repo/_schema.pg").unwrap();
assert_eq!(location.bucket, "bucket");
assert_eq!(location.key, "repo/_schema.pg");
}
}