1use std::env;
2use std::fmt::Debug;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::TryStreamExt;
8use object_store::aws::AmazonS3Builder;
9use object_store::path::Path as ObjectPath;
10use object_store::{DynObjectStore, ObjectStore, PutPayload};
11use url::Url;
12
13use crate::error::{OmniError, Result};
14
15const FILE_SCHEME_PREFIX: &str = "file://";
16const S3_SCHEME_PREFIX: &str = "s3://";
17
18#[async_trait]
19pub trait StorageAdapter: Debug + Send + Sync {
20 async fn read_text(&self, uri: &str) -> Result<String>;
21 async fn write_text(&self, uri: &str, contents: &str) -> Result<()>;
22 async fn exists(&self, uri: &str) -> Result<bool>;
23 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()>;
28 async fn delete(&self, uri: &str) -> Result<()>;
30 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>>;
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum StorageKind {
38 Local,
39 S3,
40}
41
42#[derive(Debug, Default)]
43pub struct LocalStorageAdapter;
44
45#[derive(Debug)]
46pub struct S3StorageAdapter {
47 bucket: String,
48 store: Arc<DynObjectStore>,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
52struct S3Location {
53 bucket: String,
54 key: String,
55}
56
57#[async_trait]
58impl StorageAdapter for LocalStorageAdapter {
59 async fn read_text(&self, uri: &str) -> Result<String> {
60 let path = local_path_from_uri(uri)?;
61 Ok(tokio::fs::read_to_string(&path).await?)
62 }
63
64 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
65 let path = local_path_from_uri(uri)?;
66 if let Some(parent) = path.parent() {
72 if !parent.as_os_str().is_empty() {
73 tokio::fs::create_dir_all(parent).await?;
74 }
75 }
76 tokio::fs::write(&path, contents).await?;
77 Ok(())
78 }
79
80 async fn exists(&self, uri: &str) -> Result<bool> {
81 Ok(local_path_from_uri(uri)?.exists())
82 }
83
84 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
85 let from = local_path_from_uri(from_uri)?;
86 let to = local_path_from_uri(to_uri)?;
87 tokio::fs::rename(&from, &to).await?;
88 Ok(())
89 }
90
91 async fn delete(&self, uri: &str) -> Result<()> {
92 let path = local_path_from_uri(uri)?;
93 match tokio::fs::remove_file(&path).await {
94 Ok(()) => Ok(()),
95 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
96 Err(err) => Err(err.into()),
97 }
98 }
99
100 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
101 let path = local_path_from_uri(dir_uri)?;
102 let mut out = Vec::new();
103 let mut entries = match tokio::fs::read_dir(&path).await {
104 Ok(e) => e,
105 Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(out),
106 Err(err) => return Err(err.into()),
107 };
108 let dir_str = dir_uri.trim_end_matches('/');
109 while let Some(entry) = entries.next_entry().await? {
110 let ft = entry.file_type().await?;
111 if !ft.is_file() {
112 continue;
113 }
114 if let Some(name) = entry.file_name().to_str() {
115 out.push(format!("{}/{}", dir_str, name));
116 }
117 }
118 Ok(out)
119 }
120}
121
122#[async_trait]
123impl StorageAdapter for S3StorageAdapter {
124 async fn read_text(&self, uri: &str) -> Result<String> {
125 let location = self.object_path(uri)?;
126 let bytes = self
127 .store
128 .get(&location)
129 .await
130 .map_err(|err| storage_backend_error("read", uri, err))?
131 .bytes()
132 .await
133 .map_err(|err| storage_backend_error("read", uri, err))?;
134
135 String::from_utf8(bytes.to_vec()).map_err(|err| {
136 OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
137 })
138 }
139
140 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
141 let location = self.object_path(uri)?;
142 self.store
143 .put(&location, PutPayload::from(contents.as_bytes().to_vec()))
144 .await
145 .map_err(|err| storage_backend_error("write", uri, err))?;
146 Ok(())
147 }
148
149 async fn exists(&self, uri: &str) -> Result<bool> {
150 let location = self.object_path(uri)?;
151 match self.store.head(&location).await {
152 Ok(_) => Ok(true),
153 Err(object_store::Error::NotFound { .. }) => {
154 let mut entries = self.store.list(Some(&location));
155 let has_prefix_entries = entries
156 .try_next()
157 .await
158 .map_err(|err| storage_backend_error("exists", uri, err))?
159 .is_some();
160 Ok(has_prefix_entries)
161 }
162 Err(err) => Err(storage_backend_error("exists", uri, err)),
163 }
164 }
165
166 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
167 let from = self.object_path(from_uri)?;
172 let to = self.object_path(to_uri)?;
173 self.store
174 .copy(&from, &to)
175 .await
176 .map_err(|err| storage_backend_error("rename:copy", from_uri, err))?;
177 self.store
178 .delete(&from)
179 .await
180 .map_err(|err| storage_backend_error("rename:delete", from_uri, err))?;
181 Ok(())
182 }
183
184 async fn delete(&self, uri: &str) -> Result<()> {
185 let location = self.object_path(uri)?;
186 match self.store.delete(&location).await {
187 Ok(()) => Ok(()),
188 Err(object_store::Error::NotFound { .. }) => Ok(()),
189 Err(err) => Err(storage_backend_error("delete", uri, err)),
190 }
191 }
192
193 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
194 let dir_with_slash = if dir_uri.ends_with('/') {
198 dir_uri.to_string()
199 } else {
200 format!("{}/", dir_uri)
201 };
202 let prefix_loc = self.object_path(&dir_with_slash)?;
204 let prefix_with_slash = format!("{}/", prefix_loc.as_ref());
205
206 let mut entries = self.store.list(Some(&prefix_loc));
207 let mut out = Vec::new();
208 let bucket_root = format!("{}{}/", S3_SCHEME_PREFIX, self.bucket);
209 while let Some(meta) = entries
210 .try_next()
211 .await
212 .map_err(|err| storage_backend_error("list_dir", dir_uri, err))?
213 {
214 let key_str = meta.location.as_ref();
215 if !key_str.starts_with(&prefix_with_slash) {
218 continue;
219 }
220 let suffix = &key_str[prefix_with_slash.len()..];
221 if suffix.contains('/') {
223 continue;
224 }
225 out.push(format!("{}{}", bucket_root, key_str));
226 }
227 Ok(out)
228 }
229}
230
231impl S3StorageAdapter {
232 fn from_root_uri(root_uri: &str) -> Result<Self> {
233 let location = parse_s3_uri(root_uri)?;
234 let mut builder = AmazonS3Builder::from_env().with_bucket_name(&location.bucket);
235
236 if let Some(endpoint) = env::var("AWS_ENDPOINT_URL_S3")
237 .ok()
238 .or_else(|| env::var("AWS_ENDPOINT_URL").ok())
239 {
240 builder = builder.with_endpoint(&endpoint);
241 if endpoint.starts_with("http://") || env_var_truthy("AWS_ALLOW_HTTP") {
242 builder = builder.with_allow_http(true);
243 }
244 }
245
246 if env_var_truthy("AWS_S3_FORCE_PATH_STYLE") {
247 builder = builder.with_virtual_hosted_style_request(false);
248 }
249
250 let store = builder.build().map_err(|err| {
251 OmniError::manifest_internal(format!(
252 "failed to initialize s3 storage for '{}': {}",
253 root_uri, err
254 ))
255 })?;
256
257 Ok(Self {
258 bucket: location.bucket,
259 store: Arc::new(store),
260 })
261 }
262
263 fn object_path(&self, uri: &str) -> Result<ObjectPath> {
264 let location = parse_s3_uri(uri)?;
265 if location.bucket != self.bucket {
266 return Err(OmniError::manifest_internal(format!(
267 "s3 storage bucket mismatch for '{}': expected '{}', found '{}'",
268 uri, self.bucket, location.bucket
269 )));
270 }
271 if location.key.is_empty() {
272 return Err(OmniError::manifest_internal(format!(
273 "s3 storage path is empty for '{}'",
274 uri
275 )));
276 }
277 ObjectPath::parse(&location.key).map_err(|err| {
278 OmniError::manifest_internal(format!("invalid s3 object path for '{}': {}", uri, err))
279 })
280 }
281}
282
283pub fn storage_kind_for_uri(uri: &str) -> StorageKind {
284 if uri.starts_with(S3_SCHEME_PREFIX) {
285 StorageKind::S3
286 } else {
287 StorageKind::Local
288 }
289}
290
291pub fn storage_for_uri(uri: &str) -> Result<Arc<dyn StorageAdapter>> {
292 match storage_kind_for_uri(uri) {
293 StorageKind::Local => Ok(Arc::new(LocalStorageAdapter)),
294 StorageKind::S3 => Ok(Arc::new(S3StorageAdapter::from_root_uri(uri)?)),
295 }
296}
297
298pub fn normalize_root_uri(uri: &str) -> Result<String> {
299 match storage_kind_for_uri(uri) {
300 StorageKind::Local => {
301 let path = local_path_from_uri(uri)?;
302 Ok(normalize_local_path(&path))
303 }
304 StorageKind::S3 => Ok(trim_trailing_slashes(uri)),
305 }
306}
307
308pub fn join_uri(root_uri: &str, relative_path: &str) -> String {
309 let relative_path = relative_path.trim_start_matches('/');
310 match storage_kind_for_uri(root_uri) {
311 StorageKind::S3 => {
312 let root = trim_trailing_slashes(root_uri);
313 if root.is_empty() {
314 relative_path.to_string()
315 } else {
316 format!("{}/{}", root, relative_path)
317 }
318 }
319 StorageKind::Local => {
320 let root = if root_uri.starts_with(FILE_SCHEME_PREFIX) {
321 local_path_from_file_uri(root_uri)
322 .map(|path| normalize_local_path(&path))
323 .unwrap_or_else(|_| trim_trailing_slashes(root_uri))
324 } else {
325 normalize_local_path(Path::new(root_uri))
326 };
327 let joined = Path::new(&root).join(relative_path);
328 normalize_local_path(&joined)
329 }
330 }
331}
332
333fn local_path_from_uri(uri: &str) -> Result<PathBuf> {
334 if uri.starts_with(FILE_SCHEME_PREFIX) {
335 return local_path_from_file_uri(uri);
336 }
337 Ok(PathBuf::from(uri))
338}
339
340fn local_path_from_file_uri(uri: &str) -> Result<PathBuf> {
341 let url = Url::parse(uri).map_err(|err| {
342 OmniError::manifest_internal(format!("invalid file uri '{}': {}", uri, err))
343 })?;
344 url.to_file_path()
345 .map_err(|_| OmniError::manifest_internal(format!("invalid file uri '{}'", uri)))
346}
347
348fn parse_s3_uri(uri: &str) -> Result<S3Location> {
349 let url = Url::parse(uri).map_err(|err| {
350 OmniError::manifest_internal(format!("invalid s3 uri '{}': {}", uri, err))
351 })?;
352 if url.scheme() != "s3" {
353 return Err(OmniError::manifest_internal(format!(
354 "unsupported s3 uri '{}'",
355 uri
356 )));
357 }
358 let bucket = url
359 .host_str()
360 .ok_or_else(|| OmniError::manifest_internal(format!("missing s3 bucket in '{}'", uri)))?;
361 Ok(S3Location {
362 bucket: bucket.to_string(),
363 key: url.path().trim_start_matches('/').to_string(),
364 })
365}
366
367fn storage_backend_error(action: &str, uri: &str, err: impl std::fmt::Display) -> OmniError {
368 OmniError::manifest_internal(format!("storage {} failed for '{}': {}", action, uri, err))
369}
370
371fn normalize_local_path(path: &Path) -> String {
372 let raw = path.as_os_str().to_string_lossy();
373 if raw == "/" {
374 return raw.to_string();
375 }
376 trim_trailing_slashes(&raw)
377}
378
379fn trim_trailing_slashes(value: &str) -> String {
380 let trimmed = value.trim_end_matches('/');
381 if trimmed.is_empty() {
382 value.to_string()
383 } else {
384 trimmed.to_string()
385 }
386}
387
388fn env_var_truthy(key: &str) -> bool {
389 matches!(
390 env::var(key).ok().as_deref(),
391 Some("1" | "true" | "TRUE" | "True" | "yes" | "YES" | "on" | "ON")
392 )
393}
394
395#[cfg(test)]
396mod tests {
397 use super::*;
398
399 #[test]
400 fn storage_backend_selection_is_scheme_aware() {
401 assert_eq!(storage_kind_for_uri("/tmp/repo"), StorageKind::Local);
402 assert_eq!(storage_kind_for_uri("file:///tmp/repo"), StorageKind::Local);
403 assert_eq!(
404 storage_kind_for_uri("s3://omnigraph-preview/repo"),
405 StorageKind::S3
406 );
407 }
408
409 #[test]
410 fn normalize_root_uri_preserves_local_and_s3_shapes() {
411 assert_eq!(
412 normalize_root_uri("/tmp/omnigraph/").unwrap(),
413 "/tmp/omnigraph"
414 );
415 assert_eq!(
416 normalize_root_uri("file:///tmp/omnigraph/").unwrap(),
417 "/tmp/omnigraph"
418 );
419 assert_eq!(
420 normalize_root_uri("s3://bucket/prefix/").unwrap(),
421 "s3://bucket/prefix"
422 );
423 }
424
425 #[test]
426 fn join_uri_handles_local_file_and_s3_roots() {
427 assert_eq!(
428 join_uri("/tmp/omnigraph", "_schema.pg"),
429 "/tmp/omnigraph/_schema.pg"
430 );
431 assert_eq!(
432 join_uri("file:///tmp/omnigraph", "_schema.pg"),
433 "/tmp/omnigraph/_schema.pg"
434 );
435 assert_eq!(
436 join_uri("s3://bucket/prefix", "_schema.pg"),
437 "s3://bucket/prefix/_schema.pg"
438 );
439 }
440
441 #[test]
442 fn parse_s3_uri_splits_bucket_and_key() {
443 let location = parse_s3_uri("s3://bucket/repo/_schema.pg").unwrap();
444 assert_eq!(location.bucket, "bucket");
445 assert_eq!(location.key, "repo/_schema.pg");
446 }
447}