Skip to main content

omnigraph/
storage.rs

1use std::env;
2use std::fmt::Debug;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::TryStreamExt;
8use object_store::aws::AmazonS3Builder;
9use object_store::path::Path as ObjectPath;
10use object_store::{DynObjectStore, ObjectStore, PutPayload};
11use url::Url;
12
13use crate::error::{OmniError, Result};
14
15const FILE_SCHEME_PREFIX: &str = "file://";
16const S3_SCHEME_PREFIX: &str = "s3://";
17
18#[async_trait]
19pub trait StorageAdapter: Debug + Send + Sync {
20    async fn read_text(&self, uri: &str) -> Result<String>;
21    async fn write_text(&self, uri: &str, contents: &str) -> Result<()>;
22    async fn exists(&self, uri: &str) -> Result<bool>;
23    /// Move a file from `from_uri` to `to_uri`, replacing any existing file at
24    /// `to_uri`. Atomic on local POSIX; on S3 implemented as copy + delete
25    /// (NOT atomic — callers that depend on atomicity for crash recovery must
26    /// tolerate "both source and destination exist after a crash").
27    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()>;
28    /// Remove a file. Returns Ok(()) if the file does not exist.
29    async fn delete(&self, uri: &str) -> Result<()>;
30    /// List all files (non-recursively, files only) directly under `dir_uri`.
31    /// Returns full URIs (same scheme as `dir_uri`). The result is unordered.
32    /// Returns Ok(empty) if the directory does not exist or is empty.
33    async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>>;
34}
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
37pub enum StorageKind {
38    Local,
39    S3,
40}
41
42#[derive(Debug, Default)]
43pub struct LocalStorageAdapter;
44
45#[derive(Debug)]
46pub struct S3StorageAdapter {
47    bucket: String,
48    store: Arc<DynObjectStore>,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
52struct S3Location {
53    bucket: String,
54    key: String,
55}
56
57#[async_trait]
58impl StorageAdapter for LocalStorageAdapter {
59    async fn read_text(&self, uri: &str) -> Result<String> {
60        let path = local_path_from_uri(uri)?;
61        Ok(tokio::fs::read_to_string(&path).await?)
62    }
63
64    async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
65        let path = local_path_from_uri(uri)?;
66        // Ensure parent directory exists. S3 has no equivalent (PutObject
67        // is path-agnostic). For local fs, callers like the recovery
68        // sidecar protocol expect transparent directory creation under
69        // the repo root (the `__recovery/` directory doesn't pre-exist;
70        // first sidecar write creates it).
71        if let Some(parent) = path.parent() {
72            if !parent.as_os_str().is_empty() {
73                tokio::fs::create_dir_all(parent).await?;
74            }
75        }
76        tokio::fs::write(&path, contents).await?;
77        Ok(())
78    }
79
80    async fn exists(&self, uri: &str) -> Result<bool> {
81        Ok(local_path_from_uri(uri)?.exists())
82    }
83
84    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
85        let from = local_path_from_uri(from_uri)?;
86        let to = local_path_from_uri(to_uri)?;
87        tokio::fs::rename(&from, &to).await?;
88        Ok(())
89    }
90
91    async fn delete(&self, uri: &str) -> Result<()> {
92        let path = local_path_from_uri(uri)?;
93        match tokio::fs::remove_file(&path).await {
94            Ok(()) => Ok(()),
95            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
96            Err(err) => Err(err.into()),
97        }
98    }
99
100    async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
101        let path = local_path_from_uri(dir_uri)?;
102        let mut out = Vec::new();
103        let mut entries = match tokio::fs::read_dir(&path).await {
104            Ok(e) => e,
105            Err(err) if err.kind() == std::io::ErrorKind::NotFound => return Ok(out),
106            Err(err) => return Err(err.into()),
107        };
108        let dir_str = dir_uri.trim_end_matches('/');
109        while let Some(entry) = entries.next_entry().await? {
110            let ft = entry.file_type().await?;
111            if !ft.is_file() {
112                continue;
113            }
114            if let Some(name) = entry.file_name().to_str() {
115                out.push(format!("{}/{}", dir_str, name));
116            }
117        }
118        Ok(out)
119    }
120}
121
122#[async_trait]
123impl StorageAdapter for S3StorageAdapter {
124    async fn read_text(&self, uri: &str) -> Result<String> {
125        let location = self.object_path(uri)?;
126        let bytes = self
127            .store
128            .get(&location)
129            .await
130            .map_err(|err| storage_backend_error("read", uri, err))?
131            .bytes()
132            .await
133            .map_err(|err| storage_backend_error("read", uri, err))?;
134
135        String::from_utf8(bytes.to_vec()).map_err(|err| {
136            OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
137        })
138    }
139
140    async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
141        let location = self.object_path(uri)?;
142        self.store
143            .put(&location, PutPayload::from(contents.as_bytes().to_vec()))
144            .await
145            .map_err(|err| storage_backend_error("write", uri, err))?;
146        Ok(())
147    }
148
149    async fn exists(&self, uri: &str) -> Result<bool> {
150        let location = self.object_path(uri)?;
151        match self.store.head(&location).await {
152            Ok(_) => Ok(true),
153            Err(object_store::Error::NotFound { .. }) => {
154                let mut entries = self.store.list(Some(&location));
155                let has_prefix_entries = entries
156                    .try_next()
157                    .await
158                    .map_err(|err| storage_backend_error("exists", uri, err))?
159                    .is_some();
160                Ok(has_prefix_entries)
161            }
162            Err(err) => Err(storage_backend_error("exists", uri, err)),
163        }
164    }
165
166    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
167        // S3 has no atomic rename. Copy then delete; if the copy succeeds and
168        // the delete fails (or the process crashes between them), both
169        // source and destination exist with the same content. Recovery code
170        // must tolerate this case — see schema_state::recover_schema_state_files.
171        let from = self.object_path(from_uri)?;
172        let to = self.object_path(to_uri)?;
173        self.store
174            .copy(&from, &to)
175            .await
176            .map_err(|err| storage_backend_error("rename:copy", from_uri, err))?;
177        self.store
178            .delete(&from)
179            .await
180            .map_err(|err| storage_backend_error("rename:delete", from_uri, err))?;
181        Ok(())
182    }
183
184    async fn delete(&self, uri: &str) -> Result<()> {
185        let location = self.object_path(uri)?;
186        match self.store.delete(&location).await {
187            Ok(()) => Ok(()),
188            Err(object_store::Error::NotFound { .. }) => Ok(()),
189            Err(err) => Err(storage_backend_error("delete", uri, err)),
190        }
191    }
192
193    async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
194        // Normalize: ensure the URI describes a directory (trailing '/') so
195        // we don't match sibling paths with a shared prefix
196        // (e.g. listing `__recovery` shouldn't match `__recovery_log/...`).
197        let dir_with_slash = if dir_uri.ends_with('/') {
198            dir_uri.to_string()
199        } else {
200            format!("{}/", dir_uri)
201        };
202        // object_store::Path strips the trailing '/'; re-add it for filtering.
203        let prefix_loc = self.object_path(&dir_with_slash)?;
204        let prefix_with_slash = format!("{}/", prefix_loc.as_ref());
205
206        let mut entries = self.store.list(Some(&prefix_loc));
207        let mut out = Vec::new();
208        let bucket_root = format!("{}{}/", S3_SCHEME_PREFIX, self.bucket);
209        while let Some(meta) = entries
210            .try_next()
211            .await
212            .map_err(|err| storage_backend_error("list_dir", dir_uri, err))?
213        {
214            let key_str = meta.location.as_ref();
215            // Require the directory boundary to filter out sibling-prefix
216            // matches (object_store's `list` is prefix-based, not dir-based).
217            if !key_str.starts_with(&prefix_with_slash) {
218                continue;
219            }
220            let suffix = &key_str[prefix_with_slash.len()..];
221            // Non-recursive: skip anything inside a sub-directory.
222            if suffix.contains('/') {
223                continue;
224            }
225            out.push(format!("{}{}", bucket_root, key_str));
226        }
227        Ok(out)
228    }
229}
230
231impl S3StorageAdapter {
232    fn from_root_uri(root_uri: &str) -> Result<Self> {
233        let location = parse_s3_uri(root_uri)?;
234        let mut builder = AmazonS3Builder::from_env().with_bucket_name(&location.bucket);
235
236        if let Some(endpoint) = env::var("AWS_ENDPOINT_URL_S3")
237            .ok()
238            .or_else(|| env::var("AWS_ENDPOINT_URL").ok())
239        {
240            builder = builder.with_endpoint(&endpoint);
241            if endpoint.starts_with("http://") || env_var_truthy("AWS_ALLOW_HTTP") {
242                builder = builder.with_allow_http(true);
243            }
244        }
245
246        if env_var_truthy("AWS_S3_FORCE_PATH_STYLE") {
247            builder = builder.with_virtual_hosted_style_request(false);
248        }
249
250        let store = builder.build().map_err(|err| {
251            OmniError::manifest_internal(format!(
252                "failed to initialize s3 storage for '{}': {}",
253                root_uri, err
254            ))
255        })?;
256
257        Ok(Self {
258            bucket: location.bucket,
259            store: Arc::new(store),
260        })
261    }
262
263    fn object_path(&self, uri: &str) -> Result<ObjectPath> {
264        let location = parse_s3_uri(uri)?;
265        if location.bucket != self.bucket {
266            return Err(OmniError::manifest_internal(format!(
267                "s3 storage bucket mismatch for '{}': expected '{}', found '{}'",
268                uri, self.bucket, location.bucket
269            )));
270        }
271        if location.key.is_empty() {
272            return Err(OmniError::manifest_internal(format!(
273                "s3 storage path is empty for '{}'",
274                uri
275            )));
276        }
277        ObjectPath::parse(&location.key).map_err(|err| {
278            OmniError::manifest_internal(format!("invalid s3 object path for '{}': {}", uri, err))
279        })
280    }
281}
282
283pub fn storage_kind_for_uri(uri: &str) -> StorageKind {
284    if uri.starts_with(S3_SCHEME_PREFIX) {
285        StorageKind::S3
286    } else {
287        StorageKind::Local
288    }
289}
290
291pub fn storage_for_uri(uri: &str) -> Result<Arc<dyn StorageAdapter>> {
292    match storage_kind_for_uri(uri) {
293        StorageKind::Local => Ok(Arc::new(LocalStorageAdapter)),
294        StorageKind::S3 => Ok(Arc::new(S3StorageAdapter::from_root_uri(uri)?)),
295    }
296}
297
298pub fn normalize_root_uri(uri: &str) -> Result<String> {
299    match storage_kind_for_uri(uri) {
300        StorageKind::Local => {
301            let path = local_path_from_uri(uri)?;
302            Ok(normalize_local_path(&path))
303        }
304        StorageKind::S3 => Ok(trim_trailing_slashes(uri)),
305    }
306}
307
308pub fn join_uri(root_uri: &str, relative_path: &str) -> String {
309    let relative_path = relative_path.trim_start_matches('/');
310    match storage_kind_for_uri(root_uri) {
311        StorageKind::S3 => {
312            let root = trim_trailing_slashes(root_uri);
313            if root.is_empty() {
314                relative_path.to_string()
315            } else {
316                format!("{}/{}", root, relative_path)
317            }
318        }
319        StorageKind::Local => {
320            let root = if root_uri.starts_with(FILE_SCHEME_PREFIX) {
321                local_path_from_file_uri(root_uri)
322                    .map(|path| normalize_local_path(&path))
323                    .unwrap_or_else(|_| trim_trailing_slashes(root_uri))
324            } else {
325                normalize_local_path(Path::new(root_uri))
326            };
327            let joined = Path::new(&root).join(relative_path);
328            normalize_local_path(&joined)
329        }
330    }
331}
332
333fn local_path_from_uri(uri: &str) -> Result<PathBuf> {
334    if uri.starts_with(FILE_SCHEME_PREFIX) {
335        return local_path_from_file_uri(uri);
336    }
337    Ok(PathBuf::from(uri))
338}
339
340fn local_path_from_file_uri(uri: &str) -> Result<PathBuf> {
341    let url = Url::parse(uri).map_err(|err| {
342        OmniError::manifest_internal(format!("invalid file uri '{}': {}", uri, err))
343    })?;
344    url.to_file_path()
345        .map_err(|_| OmniError::manifest_internal(format!("invalid file uri '{}'", uri)))
346}
347
348fn parse_s3_uri(uri: &str) -> Result<S3Location> {
349    let url = Url::parse(uri).map_err(|err| {
350        OmniError::manifest_internal(format!("invalid s3 uri '{}': {}", uri, err))
351    })?;
352    if url.scheme() != "s3" {
353        return Err(OmniError::manifest_internal(format!(
354            "unsupported s3 uri '{}'",
355            uri
356        )));
357    }
358    let bucket = url
359        .host_str()
360        .ok_or_else(|| OmniError::manifest_internal(format!("missing s3 bucket in '{}'", uri)))?;
361    Ok(S3Location {
362        bucket: bucket.to_string(),
363        key: url.path().trim_start_matches('/').to_string(),
364    })
365}
366
367fn storage_backend_error(action: &str, uri: &str, err: impl std::fmt::Display) -> OmniError {
368    OmniError::manifest_internal(format!("storage {} failed for '{}': {}", action, uri, err))
369}
370
371fn normalize_local_path(path: &Path) -> String {
372    let raw = path.as_os_str().to_string_lossy();
373    if raw == "/" {
374        return raw.to_string();
375    }
376    trim_trailing_slashes(&raw)
377}
378
379fn trim_trailing_slashes(value: &str) -> String {
380    let trimmed = value.trim_end_matches('/');
381    if trimmed.is_empty() {
382        value.to_string()
383    } else {
384        trimmed.to_string()
385    }
386}
387
388fn env_var_truthy(key: &str) -> bool {
389    matches!(
390        env::var(key).ok().as_deref(),
391        Some("1" | "true" | "TRUE" | "True" | "yes" | "YES" | "on" | "ON")
392    )
393}
394
395#[cfg(test)]
396mod tests {
397    use super::*;
398
399    #[test]
400    fn storage_backend_selection_is_scheme_aware() {
401        assert_eq!(storage_kind_for_uri("/tmp/repo"), StorageKind::Local);
402        assert_eq!(storage_kind_for_uri("file:///tmp/repo"), StorageKind::Local);
403        assert_eq!(
404            storage_kind_for_uri("s3://omnigraph-preview/repo"),
405            StorageKind::S3
406        );
407    }
408
409    #[test]
410    fn normalize_root_uri_preserves_local_and_s3_shapes() {
411        assert_eq!(
412            normalize_root_uri("/tmp/omnigraph/").unwrap(),
413            "/tmp/omnigraph"
414        );
415        assert_eq!(
416            normalize_root_uri("file:///tmp/omnigraph/").unwrap(),
417            "/tmp/omnigraph"
418        );
419        assert_eq!(
420            normalize_root_uri("s3://bucket/prefix/").unwrap(),
421            "s3://bucket/prefix"
422        );
423    }
424
425    #[test]
426    fn join_uri_handles_local_file_and_s3_roots() {
427        assert_eq!(
428            join_uri("/tmp/omnigraph", "_schema.pg"),
429            "/tmp/omnigraph/_schema.pg"
430        );
431        assert_eq!(
432            join_uri("file:///tmp/omnigraph", "_schema.pg"),
433            "/tmp/omnigraph/_schema.pg"
434        );
435        assert_eq!(
436            join_uri("s3://bucket/prefix", "_schema.pg"),
437            "s3://bucket/prefix/_schema.pg"
438        );
439    }
440
441    #[test]
442    fn parse_s3_uri_splits_bucket_and_key() {
443        let location = parse_s3_uri("s3://bucket/repo/_schema.pg").unwrap();
444        assert_eq!(location.bucket, "bucket");
445        assert_eq!(location.key, "repo/_schema.pg");
446    }
447}