1use std::env;
2use std::fmt::Debug;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::TryStreamExt;
8use object_store::aws::AmazonS3Builder;
9use object_store::path::Path as ObjectPath;
10use object_store::{DynObjectStore, ObjectStore, PutPayload};
11use url::Url;
12
13use crate::error::{OmniError, Result};
14
15const FILE_SCHEME_PREFIX: &str = "file://";
16const S3_SCHEME_PREFIX: &str = "s3://";
17
18#[async_trait]
19pub trait StorageAdapter: Debug + Send + Sync {
20 async fn read_text(&self, uri: &str) -> Result<String>;
21 async fn write_text(&self, uri: &str, contents: &str) -> Result<()>;
22 async fn exists(&self, uri: &str) -> Result<bool>;
23 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()>;
28 async fn delete(&self, uri: &str) -> Result<()>;
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum StorageKind {
34 Local,
35 S3,
36}
37
38#[derive(Debug, Default)]
39pub struct LocalStorageAdapter;
40
41#[derive(Debug)]
42pub struct S3StorageAdapter {
43 bucket: String,
44 store: Arc<DynObjectStore>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48struct S3Location {
49 bucket: String,
50 key: String,
51}
52
53#[async_trait]
54impl StorageAdapter for LocalStorageAdapter {
55 async fn read_text(&self, uri: &str) -> Result<String> {
56 let path = local_path_from_uri(uri)?;
57 Ok(tokio::fs::read_to_string(&path).await?)
58 }
59
60 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
61 let path = local_path_from_uri(uri)?;
62 tokio::fs::write(&path, contents).await?;
63 Ok(())
64 }
65
66 async fn exists(&self, uri: &str) -> Result<bool> {
67 Ok(local_path_from_uri(uri)?.exists())
68 }
69
70 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
71 let from = local_path_from_uri(from_uri)?;
72 let to = local_path_from_uri(to_uri)?;
73 tokio::fs::rename(&from, &to).await?;
74 Ok(())
75 }
76
77 async fn delete(&self, uri: &str) -> Result<()> {
78 let path = local_path_from_uri(uri)?;
79 match tokio::fs::remove_file(&path).await {
80 Ok(()) => Ok(()),
81 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
82 Err(err) => Err(err.into()),
83 }
84 }
85}
86
87#[async_trait]
88impl StorageAdapter for S3StorageAdapter {
89 async fn read_text(&self, uri: &str) -> Result<String> {
90 let location = self.object_path(uri)?;
91 let bytes = self
92 .store
93 .get(&location)
94 .await
95 .map_err(|err| storage_backend_error("read", uri, err))?
96 .bytes()
97 .await
98 .map_err(|err| storage_backend_error("read", uri, err))?;
99
100 String::from_utf8(bytes.to_vec()).map_err(|err| {
101 OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
102 })
103 }
104
105 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
106 let location = self.object_path(uri)?;
107 self.store
108 .put(&location, PutPayload::from(contents.as_bytes().to_vec()))
109 .await
110 .map_err(|err| storage_backend_error("write", uri, err))?;
111 Ok(())
112 }
113
114 async fn exists(&self, uri: &str) -> Result<bool> {
115 let location = self.object_path(uri)?;
116 match self.store.head(&location).await {
117 Ok(_) => Ok(true),
118 Err(object_store::Error::NotFound { .. }) => {
119 let mut entries = self.store.list(Some(&location));
120 let has_prefix_entries = entries
121 .try_next()
122 .await
123 .map_err(|err| storage_backend_error("exists", uri, err))?
124 .is_some();
125 Ok(has_prefix_entries)
126 }
127 Err(err) => Err(storage_backend_error("exists", uri, err)),
128 }
129 }
130
131 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
132 let from = self.object_path(from_uri)?;
137 let to = self.object_path(to_uri)?;
138 self.store
139 .copy(&from, &to)
140 .await
141 .map_err(|err| storage_backend_error("rename:copy", from_uri, err))?;
142 self.store
143 .delete(&from)
144 .await
145 .map_err(|err| storage_backend_error("rename:delete", from_uri, err))?;
146 Ok(())
147 }
148
149 async fn delete(&self, uri: &str) -> Result<()> {
150 let location = self.object_path(uri)?;
151 match self.store.delete(&location).await {
152 Ok(()) => Ok(()),
153 Err(object_store::Error::NotFound { .. }) => Ok(()),
154 Err(err) => Err(storage_backend_error("delete", uri, err)),
155 }
156 }
157}
158
159impl S3StorageAdapter {
160 fn from_root_uri(root_uri: &str) -> Result<Self> {
161 let location = parse_s3_uri(root_uri)?;
162 let mut builder = AmazonS3Builder::from_env().with_bucket_name(&location.bucket);
163
164 if let Some(endpoint) = env::var("AWS_ENDPOINT_URL_S3")
165 .ok()
166 .or_else(|| env::var("AWS_ENDPOINT_URL").ok())
167 {
168 builder = builder.with_endpoint(&endpoint);
169 if endpoint.starts_with("http://") || env_var_truthy("AWS_ALLOW_HTTP") {
170 builder = builder.with_allow_http(true);
171 }
172 }
173
174 if env_var_truthy("AWS_S3_FORCE_PATH_STYLE") {
175 builder = builder.with_virtual_hosted_style_request(false);
176 }
177
178 let store = builder.build().map_err(|err| {
179 OmniError::manifest_internal(format!(
180 "failed to initialize s3 storage for '{}': {}",
181 root_uri, err
182 ))
183 })?;
184
185 Ok(Self {
186 bucket: location.bucket,
187 store: Arc::new(store),
188 })
189 }
190
191 fn object_path(&self, uri: &str) -> Result<ObjectPath> {
192 let location = parse_s3_uri(uri)?;
193 if location.bucket != self.bucket {
194 return Err(OmniError::manifest_internal(format!(
195 "s3 storage bucket mismatch for '{}': expected '{}', found '{}'",
196 uri, self.bucket, location.bucket
197 )));
198 }
199 if location.key.is_empty() {
200 return Err(OmniError::manifest_internal(format!(
201 "s3 storage path is empty for '{}'",
202 uri
203 )));
204 }
205 ObjectPath::parse(&location.key).map_err(|err| {
206 OmniError::manifest_internal(format!("invalid s3 object path for '{}': {}", uri, err))
207 })
208 }
209}
210
211pub fn storage_kind_for_uri(uri: &str) -> StorageKind {
212 if uri.starts_with(S3_SCHEME_PREFIX) {
213 StorageKind::S3
214 } else {
215 StorageKind::Local
216 }
217}
218
219pub fn storage_for_uri(uri: &str) -> Result<Arc<dyn StorageAdapter>> {
220 match storage_kind_for_uri(uri) {
221 StorageKind::Local => Ok(Arc::new(LocalStorageAdapter)),
222 StorageKind::S3 => Ok(Arc::new(S3StorageAdapter::from_root_uri(uri)?)),
223 }
224}
225
226pub fn normalize_root_uri(uri: &str) -> Result<String> {
227 match storage_kind_for_uri(uri) {
228 StorageKind::Local => {
229 let path = local_path_from_uri(uri)?;
230 Ok(normalize_local_path(&path))
231 }
232 StorageKind::S3 => Ok(trim_trailing_slashes(uri)),
233 }
234}
235
236pub fn join_uri(root_uri: &str, relative_path: &str) -> String {
237 let relative_path = relative_path.trim_start_matches('/');
238 match storage_kind_for_uri(root_uri) {
239 StorageKind::S3 => {
240 let root = trim_trailing_slashes(root_uri);
241 if root.is_empty() {
242 relative_path.to_string()
243 } else {
244 format!("{}/{}", root, relative_path)
245 }
246 }
247 StorageKind::Local => {
248 let root = if root_uri.starts_with(FILE_SCHEME_PREFIX) {
249 local_path_from_file_uri(root_uri)
250 .map(|path| normalize_local_path(&path))
251 .unwrap_or_else(|_| trim_trailing_slashes(root_uri))
252 } else {
253 normalize_local_path(Path::new(root_uri))
254 };
255 let joined = Path::new(&root).join(relative_path);
256 normalize_local_path(&joined)
257 }
258 }
259}
260
261fn local_path_from_uri(uri: &str) -> Result<PathBuf> {
262 if uri.starts_with(FILE_SCHEME_PREFIX) {
263 return local_path_from_file_uri(uri);
264 }
265 Ok(PathBuf::from(uri))
266}
267
268fn local_path_from_file_uri(uri: &str) -> Result<PathBuf> {
269 let url = Url::parse(uri).map_err(|err| {
270 OmniError::manifest_internal(format!("invalid file uri '{}': {}", uri, err))
271 })?;
272 url.to_file_path()
273 .map_err(|_| OmniError::manifest_internal(format!("invalid file uri '{}'", uri)))
274}
275
276fn parse_s3_uri(uri: &str) -> Result<S3Location> {
277 let url = Url::parse(uri).map_err(|err| {
278 OmniError::manifest_internal(format!("invalid s3 uri '{}': {}", uri, err))
279 })?;
280 if url.scheme() != "s3" {
281 return Err(OmniError::manifest_internal(format!(
282 "unsupported s3 uri '{}'",
283 uri
284 )));
285 }
286 let bucket = url
287 .host_str()
288 .ok_or_else(|| OmniError::manifest_internal(format!("missing s3 bucket in '{}'", uri)))?;
289 Ok(S3Location {
290 bucket: bucket.to_string(),
291 key: url.path().trim_start_matches('/').to_string(),
292 })
293}
294
295fn storage_backend_error(action: &str, uri: &str, err: impl std::fmt::Display) -> OmniError {
296 OmniError::manifest_internal(format!("storage {} failed for '{}': {}", action, uri, err))
297}
298
299fn normalize_local_path(path: &Path) -> String {
300 let raw = path.as_os_str().to_string_lossy();
301 if raw == "/" {
302 return raw.to_string();
303 }
304 trim_trailing_slashes(&raw)
305}
306
307fn trim_trailing_slashes(value: &str) -> String {
308 let trimmed = value.trim_end_matches('/');
309 if trimmed.is_empty() {
310 value.to_string()
311 } else {
312 trimmed.to_string()
313 }
314}
315
316fn env_var_truthy(key: &str) -> bool {
317 matches!(
318 env::var(key).ok().as_deref(),
319 Some("1" | "true" | "TRUE" | "True" | "yes" | "YES" | "on" | "ON")
320 )
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326
327 #[test]
328 fn storage_backend_selection_is_scheme_aware() {
329 assert_eq!(storage_kind_for_uri("/tmp/repo"), StorageKind::Local);
330 assert_eq!(storage_kind_for_uri("file:///tmp/repo"), StorageKind::Local);
331 assert_eq!(
332 storage_kind_for_uri("s3://omnigraph-preview/repo"),
333 StorageKind::S3
334 );
335 }
336
337 #[test]
338 fn normalize_root_uri_preserves_local_and_s3_shapes() {
339 assert_eq!(
340 normalize_root_uri("/tmp/omnigraph/").unwrap(),
341 "/tmp/omnigraph"
342 );
343 assert_eq!(
344 normalize_root_uri("file:///tmp/omnigraph/").unwrap(),
345 "/tmp/omnigraph"
346 );
347 assert_eq!(
348 normalize_root_uri("s3://bucket/prefix/").unwrap(),
349 "s3://bucket/prefix"
350 );
351 }
352
353 #[test]
354 fn join_uri_handles_local_file_and_s3_roots() {
355 assert_eq!(
356 join_uri("/tmp/omnigraph", "_schema.pg"),
357 "/tmp/omnigraph/_schema.pg"
358 );
359 assert_eq!(
360 join_uri("file:///tmp/omnigraph", "_schema.pg"),
361 "/tmp/omnigraph/_schema.pg"
362 );
363 assert_eq!(
364 join_uri("s3://bucket/prefix", "_schema.pg"),
365 "s3://bucket/prefix/_schema.pg"
366 );
367 }
368
369 #[test]
370 fn parse_s3_uri_splits_bucket_and_key() {
371 let location = parse_s3_uri("s3://bucket/repo/_schema.pg").unwrap();
372 assert_eq!(location.bucket, "bucket");
373 assert_eq!(location.key, "repo/_schema.pg");
374 }
375}