Skip to main content

omnigraph/
storage.rs

1use std::env;
2use std::fmt::Debug;
3use std::path::{Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::TryStreamExt;
8use object_store::aws::AmazonS3Builder;
9use object_store::path::Path as ObjectPath;
10use object_store::{DynObjectStore, ObjectStore, PutPayload};
11use url::Url;
12
13use crate::error::{OmniError, Result};
14
15const FILE_SCHEME_PREFIX: &str = "file://";
16const S3_SCHEME_PREFIX: &str = "s3://";
17
18#[async_trait]
19pub trait StorageAdapter: Debug + Send + Sync {
20    async fn read_text(&self, uri: &str) -> Result<String>;
21    async fn write_text(&self, uri: &str, contents: &str) -> Result<()>;
22    async fn exists(&self, uri: &str) -> Result<bool>;
23    /// Move a file from `from_uri` to `to_uri`, replacing any existing file at
24    /// `to_uri`. Atomic on local POSIX; on S3 implemented as copy + delete
25    /// (NOT atomic — callers that depend on atomicity for crash recovery must
26    /// tolerate "both source and destination exist after a crash").
27    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()>;
28    /// Remove a file. Returns Ok(()) if the file does not exist.
29    async fn delete(&self, uri: &str) -> Result<()>;
30}
31
32#[derive(Debug, Clone, Copy, PartialEq, Eq)]
33pub enum StorageKind {
34    Local,
35    S3,
36}
37
38#[derive(Debug, Default)]
39pub struct LocalStorageAdapter;
40
41#[derive(Debug)]
42pub struct S3StorageAdapter {
43    bucket: String,
44    store: Arc<DynObjectStore>,
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48struct S3Location {
49    bucket: String,
50    key: String,
51}
52
53#[async_trait]
54impl StorageAdapter for LocalStorageAdapter {
55    async fn read_text(&self, uri: &str) -> Result<String> {
56        let path = local_path_from_uri(uri)?;
57        Ok(tokio::fs::read_to_string(&path).await?)
58    }
59
60    async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
61        let path = local_path_from_uri(uri)?;
62        tokio::fs::write(&path, contents).await?;
63        Ok(())
64    }
65
66    async fn exists(&self, uri: &str) -> Result<bool> {
67        Ok(local_path_from_uri(uri)?.exists())
68    }
69
70    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
71        let from = local_path_from_uri(from_uri)?;
72        let to = local_path_from_uri(to_uri)?;
73        tokio::fs::rename(&from, &to).await?;
74        Ok(())
75    }
76
77    async fn delete(&self, uri: &str) -> Result<()> {
78        let path = local_path_from_uri(uri)?;
79        match tokio::fs::remove_file(&path).await {
80            Ok(()) => Ok(()),
81            Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
82            Err(err) => Err(err.into()),
83        }
84    }
85}
86
87#[async_trait]
88impl StorageAdapter for S3StorageAdapter {
89    async fn read_text(&self, uri: &str) -> Result<String> {
90        let location = self.object_path(uri)?;
91        let bytes = self
92            .store
93            .get(&location)
94            .await
95            .map_err(|err| storage_backend_error("read", uri, err))?
96            .bytes()
97            .await
98            .map_err(|err| storage_backend_error("read", uri, err))?;
99
100        String::from_utf8(bytes.to_vec()).map_err(|err| {
101            OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
102        })
103    }
104
105    async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
106        let location = self.object_path(uri)?;
107        self.store
108            .put(&location, PutPayload::from(contents.as_bytes().to_vec()))
109            .await
110            .map_err(|err| storage_backend_error("write", uri, err))?;
111        Ok(())
112    }
113
114    async fn exists(&self, uri: &str) -> Result<bool> {
115        let location = self.object_path(uri)?;
116        match self.store.head(&location).await {
117            Ok(_) => Ok(true),
118            Err(object_store::Error::NotFound { .. }) => {
119                let mut entries = self.store.list(Some(&location));
120                let has_prefix_entries = entries
121                    .try_next()
122                    .await
123                    .map_err(|err| storage_backend_error("exists", uri, err))?
124                    .is_some();
125                Ok(has_prefix_entries)
126            }
127            Err(err) => Err(storage_backend_error("exists", uri, err)),
128        }
129    }
130
131    async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
132        // S3 has no atomic rename. Copy then delete; if the copy succeeds and
133        // the delete fails (or the process crashes between them), both
134        // source and destination exist with the same content. Recovery code
135        // must tolerate this case — see schema_state::recover_schema_state_files.
136        let from = self.object_path(from_uri)?;
137        let to = self.object_path(to_uri)?;
138        self.store
139            .copy(&from, &to)
140            .await
141            .map_err(|err| storage_backend_error("rename:copy", from_uri, err))?;
142        self.store
143            .delete(&from)
144            .await
145            .map_err(|err| storage_backend_error("rename:delete", from_uri, err))?;
146        Ok(())
147    }
148
149    async fn delete(&self, uri: &str) -> Result<()> {
150        let location = self.object_path(uri)?;
151        match self.store.delete(&location).await {
152            Ok(()) => Ok(()),
153            Err(object_store::Error::NotFound { .. }) => Ok(()),
154            Err(err) => Err(storage_backend_error("delete", uri, err)),
155        }
156    }
157}
158
159impl S3StorageAdapter {
160    fn from_root_uri(root_uri: &str) -> Result<Self> {
161        let location = parse_s3_uri(root_uri)?;
162        let mut builder = AmazonS3Builder::from_env().with_bucket_name(&location.bucket);
163
164        if let Some(endpoint) = env::var("AWS_ENDPOINT_URL_S3")
165            .ok()
166            .or_else(|| env::var("AWS_ENDPOINT_URL").ok())
167        {
168            builder = builder.with_endpoint(&endpoint);
169            if endpoint.starts_with("http://") || env_var_truthy("AWS_ALLOW_HTTP") {
170                builder = builder.with_allow_http(true);
171            }
172        }
173
174        if env_var_truthy("AWS_S3_FORCE_PATH_STYLE") {
175            builder = builder.with_virtual_hosted_style_request(false);
176        }
177
178        let store = builder.build().map_err(|err| {
179            OmniError::manifest_internal(format!(
180                "failed to initialize s3 storage for '{}': {}",
181                root_uri, err
182            ))
183        })?;
184
185        Ok(Self {
186            bucket: location.bucket,
187            store: Arc::new(store),
188        })
189    }
190
191    fn object_path(&self, uri: &str) -> Result<ObjectPath> {
192        let location = parse_s3_uri(uri)?;
193        if location.bucket != self.bucket {
194            return Err(OmniError::manifest_internal(format!(
195                "s3 storage bucket mismatch for '{}': expected '{}', found '{}'",
196                uri, self.bucket, location.bucket
197            )));
198        }
199        if location.key.is_empty() {
200            return Err(OmniError::manifest_internal(format!(
201                "s3 storage path is empty for '{}'",
202                uri
203            )));
204        }
205        ObjectPath::parse(&location.key).map_err(|err| {
206            OmniError::manifest_internal(format!("invalid s3 object path for '{}': {}", uri, err))
207        })
208    }
209}
210
211pub fn storage_kind_for_uri(uri: &str) -> StorageKind {
212    if uri.starts_with(S3_SCHEME_PREFIX) {
213        StorageKind::S3
214    } else {
215        StorageKind::Local
216    }
217}
218
219pub fn storage_for_uri(uri: &str) -> Result<Arc<dyn StorageAdapter>> {
220    match storage_kind_for_uri(uri) {
221        StorageKind::Local => Ok(Arc::new(LocalStorageAdapter)),
222        StorageKind::S3 => Ok(Arc::new(S3StorageAdapter::from_root_uri(uri)?)),
223    }
224}
225
226pub fn normalize_root_uri(uri: &str) -> Result<String> {
227    match storage_kind_for_uri(uri) {
228        StorageKind::Local => {
229            let path = local_path_from_uri(uri)?;
230            Ok(normalize_local_path(&path))
231        }
232        StorageKind::S3 => Ok(trim_trailing_slashes(uri)),
233    }
234}
235
236pub fn join_uri(root_uri: &str, relative_path: &str) -> String {
237    let relative_path = relative_path.trim_start_matches('/');
238    match storage_kind_for_uri(root_uri) {
239        StorageKind::S3 => {
240            let root = trim_trailing_slashes(root_uri);
241            if root.is_empty() {
242                relative_path.to_string()
243            } else {
244                format!("{}/{}", root, relative_path)
245            }
246        }
247        StorageKind::Local => {
248            let root = if root_uri.starts_with(FILE_SCHEME_PREFIX) {
249                local_path_from_file_uri(root_uri)
250                    .map(|path| normalize_local_path(&path))
251                    .unwrap_or_else(|_| trim_trailing_slashes(root_uri))
252            } else {
253                normalize_local_path(Path::new(root_uri))
254            };
255            let joined = Path::new(&root).join(relative_path);
256            normalize_local_path(&joined)
257        }
258    }
259}
260
261fn local_path_from_uri(uri: &str) -> Result<PathBuf> {
262    if uri.starts_with(FILE_SCHEME_PREFIX) {
263        return local_path_from_file_uri(uri);
264    }
265    Ok(PathBuf::from(uri))
266}
267
268fn local_path_from_file_uri(uri: &str) -> Result<PathBuf> {
269    let url = Url::parse(uri).map_err(|err| {
270        OmniError::manifest_internal(format!("invalid file uri '{}': {}", uri, err))
271    })?;
272    url.to_file_path()
273        .map_err(|_| OmniError::manifest_internal(format!("invalid file uri '{}'", uri)))
274}
275
276fn parse_s3_uri(uri: &str) -> Result<S3Location> {
277    let url = Url::parse(uri).map_err(|err| {
278        OmniError::manifest_internal(format!("invalid s3 uri '{}': {}", uri, err))
279    })?;
280    if url.scheme() != "s3" {
281        return Err(OmniError::manifest_internal(format!(
282            "unsupported s3 uri '{}'",
283            uri
284        )));
285    }
286    let bucket = url
287        .host_str()
288        .ok_or_else(|| OmniError::manifest_internal(format!("missing s3 bucket in '{}'", uri)))?;
289    Ok(S3Location {
290        bucket: bucket.to_string(),
291        key: url.path().trim_start_matches('/').to_string(),
292    })
293}
294
295fn storage_backend_error(action: &str, uri: &str, err: impl std::fmt::Display) -> OmniError {
296    OmniError::manifest_internal(format!("storage {} failed for '{}': {}", action, uri, err))
297}
298
299fn normalize_local_path(path: &Path) -> String {
300    let raw = path.as_os_str().to_string_lossy();
301    if raw == "/" {
302        return raw.to_string();
303    }
304    trim_trailing_slashes(&raw)
305}
306
307fn trim_trailing_slashes(value: &str) -> String {
308    let trimmed = value.trim_end_matches('/');
309    if trimmed.is_empty() {
310        value.to_string()
311    } else {
312        trimmed.to_string()
313    }
314}
315
316fn env_var_truthy(key: &str) -> bool {
317    matches!(
318        env::var(key).ok().as_deref(),
319        Some("1" | "true" | "TRUE" | "True" | "yes" | "YES" | "on" | "ON")
320    )
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    #[test]
328    fn storage_backend_selection_is_scheme_aware() {
329        assert_eq!(storage_kind_for_uri("/tmp/repo"), StorageKind::Local);
330        assert_eq!(storage_kind_for_uri("file:///tmp/repo"), StorageKind::Local);
331        assert_eq!(
332            storage_kind_for_uri("s3://omnigraph-preview/repo"),
333            StorageKind::S3
334        );
335    }
336
337    #[test]
338    fn normalize_root_uri_preserves_local_and_s3_shapes() {
339        assert_eq!(
340            normalize_root_uri("/tmp/omnigraph/").unwrap(),
341            "/tmp/omnigraph"
342        );
343        assert_eq!(
344            normalize_root_uri("file:///tmp/omnigraph/").unwrap(),
345            "/tmp/omnigraph"
346        );
347        assert_eq!(
348            normalize_root_uri("s3://bucket/prefix/").unwrap(),
349            "s3://bucket/prefix"
350        );
351    }
352
353    #[test]
354    fn join_uri_handles_local_file_and_s3_roots() {
355        assert_eq!(
356            join_uri("/tmp/omnigraph", "_schema.pg"),
357            "/tmp/omnigraph/_schema.pg"
358        );
359        assert_eq!(
360            join_uri("file:///tmp/omnigraph", "_schema.pg"),
361            "/tmp/omnigraph/_schema.pg"
362        );
363        assert_eq!(
364            join_uri("s3://bucket/prefix", "_schema.pg"),
365            "s3://bucket/prefix/_schema.pg"
366        );
367    }
368
369    #[test]
370    fn parse_s3_uri_splits_bucket_and_key() {
371        let location = parse_s3_uri("s3://bucket/repo/_schema.pg").unwrap();
372        assert_eq!(location.bucket, "bucket");
373        assert_eq!(location.key, "repo/_schema.pg");
374    }
375}