1use std::env;
2use std::fmt::Debug;
3use std::path::{Component, Path, PathBuf};
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use futures::TryStreamExt;
8use object_store::aws::AmazonS3Builder;
9use object_store::local::LocalFileSystem;
10use object_store::memory::InMemory;
11use object_store::path::Path as ObjectPath;
12use object_store::{DynObjectStore, ObjectStore, ObjectStoreExt, PutMode, PutPayload};
13use url::Url;
14
15use crate::error::{OmniError, Result};
16
17const FILE_SCHEME_PREFIX: &str = "file://";
18const S3_SCHEME_PREFIX: &str = "s3://";
19
20#[async_trait]
21pub trait StorageAdapter: Debug + Send + Sync {
22 async fn read_text(&self, uri: &str) -> Result<String>;
23 async fn write_text(&self, uri: &str, contents: &str) -> Result<()>;
24 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool>;
31 async fn exists(&self, uri: &str) -> Result<bool>;
32 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()>;
37 async fn delete(&self, uri: &str) -> Result<()>;
39 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>>;
47 async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)>;
52 async fn write_text_if_match(
65 &self,
66 uri: &str,
67 contents: &str,
68 expected_version: &str,
69 ) -> Result<Option<String>>;
70 async fn delete_prefix(&self, prefix_uri: &str) -> Result<()>;
78}
79
80fn local_version_token(bytes: &[u8]) -> String {
85 use sha2::{Digest, Sha256};
86 let digest = Sha256::digest(bytes);
87 digest.iter().map(|byte| format!("{byte:02x}")).collect()
88}
89
90#[derive(Debug, Clone, Copy, PartialEq, Eq)]
91pub enum StorageKind {
92 Local,
93 S3,
94}
95
96#[derive(Debug)]
104pub struct ObjectStorageAdapter {
105 store: Arc<DynObjectStore>,
106 codec: UriCodec,
107 supports_conditional_update: bool,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
115enum UriCodec {
116 Local,
119 S3 { bucket: String },
121 Memory,
124}
125
126#[derive(Debug, Clone, PartialEq, Eq)]
127struct S3Location {
128 bucket: String,
129 key: String,
130}
131
132impl ObjectStorageAdapter {
133 pub fn local() -> Self {
137 Self {
138 store: Arc::new(LocalFileSystem::new()),
139 codec: UriCodec::Local,
140 supports_conditional_update: false,
141 }
142 }
143
144 pub fn s3_from_root_uri(root_uri: &str) -> Result<Self> {
148 let location = parse_s3_uri(root_uri)?;
149 let mut builder = AmazonS3Builder::from_env().with_bucket_name(&location.bucket);
150
151 if let Some(endpoint) = env::var("AWS_ENDPOINT_URL_S3")
152 .ok()
153 .or_else(|| env::var("AWS_ENDPOINT_URL").ok())
154 {
155 builder = builder.with_endpoint(&endpoint);
156 if endpoint.starts_with("http://") || env_var_truthy("AWS_ALLOW_HTTP") {
157 builder = builder.with_allow_http(true);
158 }
159 }
160
161 if env_var_truthy("AWS_S3_FORCE_PATH_STYLE") {
162 builder = builder.with_virtual_hosted_style_request(false);
163 }
164
165 let store = builder.build().map_err(|err| {
166 OmniError::manifest_internal(format!(
167 "failed to initialize s3 storage for '{}': {}",
168 root_uri, err
169 ))
170 })?;
171
172 Ok(Self {
173 store: Arc::new(store),
174 codec: UriCodec::S3 {
175 bucket: location.bucket,
176 },
177 supports_conditional_update: true,
178 })
179 }
180
181 pub fn in_memory() -> Self {
186 Self {
187 store: Arc::new(InMemory::new()),
188 codec: UriCodec::Memory,
189 supports_conditional_update: true,
190 }
191 }
192
193 fn object_path(&self, uri: &str) -> Result<ObjectPath> {
194 match &self.codec {
195 UriCodec::Local => {
196 let path = absolutize_lexically(local_path_from_uri(uri)?)?;
197 ObjectPath::from_absolute_path(&path).map_err(|err| {
198 OmniError::manifest_internal(format!(
199 "invalid local object path for '{}': {}",
200 uri, err
201 ))
202 })
203 }
204 UriCodec::S3 { bucket } => {
205 let location = parse_s3_uri(uri)?;
206 if &location.bucket != bucket {
207 return Err(OmniError::manifest_internal(format!(
208 "s3 storage bucket mismatch for '{}': expected '{}', found '{}'",
209 uri, bucket, location.bucket
210 )));
211 }
212 if location.key.is_empty() {
213 return Err(OmniError::manifest_internal(format!(
214 "s3 storage path is empty for '{}'",
215 uri
216 )));
217 }
218 ObjectPath::parse(&location.key).map_err(|err| {
219 OmniError::manifest_internal(format!(
220 "invalid s3 object path for '{}': {}",
221 uri, err
222 ))
223 })
224 }
225 UriCodec::Memory => {
226 ObjectPath::parse(uri.trim_start_matches('/')).map_err(|err| {
227 OmniError::manifest_internal(format!(
228 "invalid memory object path for '{}': {}",
229 uri, err
230 ))
231 })
232 }
233 }
234 }
235}
236
237#[async_trait]
238impl StorageAdapter for ObjectStorageAdapter {
239 async fn read_text(&self, uri: &str) -> Result<String> {
240 let location = self.object_path(uri)?;
241 let bytes = self
242 .store
243 .get(&location)
244 .await
245 .map_err(|err| storage_backend_error("read", uri, err))?
246 .bytes()
247 .await
248 .map_err(|err| storage_backend_error("read", uri, err))?;
249
250 String::from_utf8(bytes.to_vec()).map_err(|err| {
251 OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
252 })
253 }
254
255 async fn write_text(&self, uri: &str, contents: &str) -> Result<()> {
256 let location = self.object_path(uri)?;
262 self.store
263 .put(&location, PutPayload::from(contents.as_bytes().to_vec()))
264 .await
265 .map_err(|err| storage_backend_error("write", uri, err))?;
266 Ok(())
267 }
268
269 async fn write_text_if_absent(&self, uri: &str, contents: &str) -> Result<bool> {
270 let location = self.object_path(uri)?;
277 match self
278 .store
279 .put_opts(
280 &location,
281 PutPayload::from(contents.as_bytes().to_vec()),
282 PutMode::Create.into(),
283 )
284 .await
285 {
286 Ok(_) => Ok(true),
287 Err(object_store::Error::AlreadyExists { .. })
288 | Err(object_store::Error::Precondition { .. }) => Ok(false),
289 Err(err) => Err(storage_backend_error("write_if_absent", uri, err)),
290 }
291 }
292
293 async fn exists(&self, uri: &str) -> Result<bool> {
294 let location = self.object_path(uri)?;
301 match self.store.head(&location).await {
302 Ok(_) => Ok(true),
303 Err(object_store::Error::NotFound { .. }) => {
304 let mut entries = self.store.list(Some(&location));
305 let has_prefix_entries = entries
306 .try_next()
307 .await
308 .map_err(|err| storage_backend_error("exists", uri, err))?
309 .is_some();
310 Ok(has_prefix_entries)
311 }
312 Err(err) => Err(storage_backend_error("exists", uri, err)),
313 }
314 }
315
316 async fn rename_text(&self, from_uri: &str, to_uri: &str) -> Result<()> {
317 let from = self.object_path(from_uri)?;
324 let to = self.object_path(to_uri)?;
325 self.store
326 .rename(&from, &to)
327 .await
328 .map_err(|err| storage_backend_error("rename", from_uri, err))?;
329 Ok(())
330 }
331
332 async fn delete(&self, uri: &str) -> Result<()> {
333 let location = self.object_path(uri)?;
334 match self.store.delete(&location).await {
335 Ok(()) => Ok(()),
336 Err(object_store::Error::NotFound { .. }) => Ok(()),
337 Err(err) => Err(storage_backend_error("delete", uri, err)),
338 }
339 }
340
341 async fn list_dir(&self, dir_uri: &str) -> Result<Vec<String>> {
342 let anchor = dir_uri.trim_end_matches('/');
350 let prefix = self.object_path(anchor)?;
351 let listing = self
352 .store
353 .list_with_delimiter(Some(&prefix))
354 .await
355 .map_err(|err| storage_backend_error("list_dir", dir_uri, err))?;
356 let mut out = Vec::with_capacity(listing.objects.len());
357 for meta in listing.objects {
358 if let Some(name) = meta.location.filename() {
359 out.push(format!("{}/{}", anchor, name));
360 }
361 }
362 Ok(out)
363 }
364
365 async fn read_text_versioned(&self, uri: &str) -> Result<(String, String)> {
366 let location = self.object_path(uri)?;
367 let result = self
368 .store
369 .get(&location)
370 .await
371 .map_err(|err| storage_backend_error("read", uri, err))?;
372 let etag = result.meta.e_tag.clone();
373 let bytes = result
374 .bytes()
375 .await
376 .map_err(|err| storage_backend_error("read", uri, err))?;
377 let version = if self.supports_conditional_update {
382 etag.unwrap_or_else(|| local_version_token(&bytes))
385 } else {
386 local_version_token(&bytes)
387 };
388 let text = String::from_utf8(bytes.to_vec()).map_err(|err| {
389 OmniError::manifest_internal(format!("storage read failed for '{}': {}", uri, err))
390 })?;
391 Ok((text, version))
392 }
393
394 async fn write_text_if_match(
395 &self,
396 uri: &str,
397 contents: &str,
398 expected_version: &str,
399 ) -> Result<Option<String>> {
400 let location = self.object_path(uri)?;
401 if self.supports_conditional_update {
402 let mode = PutMode::Update(object_store::UpdateVersion {
403 e_tag: Some(expected_version.to_string()),
404 version: None,
405 });
406 return match self
407 .store
408 .put_opts(
409 &location,
410 PutPayload::from(contents.as_bytes().to_vec()),
411 mode.into(),
412 )
413 .await
414 {
415 Ok(result) => Ok(Some(
416 result
417 .e_tag
418 .unwrap_or_else(|| local_version_token(contents.as_bytes())),
419 )),
420 Err(object_store::Error::Precondition { .. })
421 | Err(object_store::Error::NotFound { .. }) => Ok(None),
422 Err(err) => Err(storage_backend_error("write_if_match", uri, err)),
423 };
424 }
425 let current = match self.store.get(&location).await {
429 Ok(result) => result
430 .bytes()
431 .await
432 .map_err(|err| storage_backend_error("read", uri, err))?,
433 Err(object_store::Error::NotFound { .. }) => return Ok(None),
434 Err(err) => return Err(storage_backend_error("read", uri, err)),
435 };
436 if local_version_token(¤t) != expected_version {
437 return Ok(None);
438 }
439 self.store
440 .put(&location, PutPayload::from(contents.as_bytes().to_vec()))
441 .await
442 .map_err(|err| storage_backend_error("write_if_match", uri, err))?;
443 Ok(Some(local_version_token(contents.as_bytes())))
444 }
445
446 async fn delete_prefix(&self, prefix_uri: &str) -> Result<()> {
447 if self.codec == UriCodec::Local {
452 let path = absolutize_lexically(local_path_from_uri(prefix_uri)?)?;
453 return match tokio::fs::remove_dir_all(&path).await {
454 Ok(()) => Ok(()),
455 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
456 Err(err) => Err(err.into()),
457 };
458 }
459 let prefix = self.object_path(prefix_uri.trim_end_matches('/'))?;
460 let mut entries = self.store.list(Some(&prefix));
461 let mut locations = Vec::new();
462 while let Some(meta) = entries
463 .try_next()
464 .await
465 .map_err(|err| storage_backend_error("delete_prefix", prefix_uri, err))?
466 {
467 locations.push(meta.location);
468 }
469 for location in locations {
470 match self.store.delete(&location).await {
471 Ok(()) => {}
472 Err(object_store::Error::NotFound { .. }) => {}
473 Err(err) => return Err(storage_backend_error("delete_prefix", prefix_uri, err)),
474 }
475 }
476 Ok(())
477 }
478}
479
480pub fn storage_kind_for_uri(uri: &str) -> StorageKind {
481 if uri.starts_with(S3_SCHEME_PREFIX) {
482 StorageKind::S3
483 } else {
484 StorageKind::Local
485 }
486}
487
488pub fn storage_for_uri(uri: &str) -> Result<Arc<dyn StorageAdapter>> {
489 match storage_kind_for_uri(uri) {
490 StorageKind::Local => Ok(Arc::new(ObjectStorageAdapter::local())),
491 StorageKind::S3 => Ok(Arc::new(ObjectStorageAdapter::s3_from_root_uri(uri)?)),
492 }
493}
494
495pub fn normalize_root_uri(uri: &str) -> Result<String> {
496 match storage_kind_for_uri(uri) {
497 StorageKind::Local => {
498 let path = local_path_from_uri(uri)?;
499 Ok(normalize_local_path(&path))
500 }
501 StorageKind::S3 => Ok(trim_trailing_slashes(uri)),
502 }
503}
504
505pub fn join_uri(root_uri: &str, relative_path: &str) -> String {
506 let relative_path = relative_path.trim_start_matches('/');
507 match storage_kind_for_uri(root_uri) {
508 StorageKind::S3 => {
509 let root = trim_trailing_slashes(root_uri);
510 if root.is_empty() {
511 relative_path.to_string()
512 } else {
513 format!("{}/{}", root, relative_path)
514 }
515 }
516 StorageKind::Local => {
517 let root = if root_uri.starts_with(FILE_SCHEME_PREFIX) {
518 local_path_from_file_uri(root_uri)
519 .map(|path| normalize_local_path(&path))
520 .unwrap_or_else(|_| trim_trailing_slashes(root_uri))
521 } else {
522 normalize_local_path(Path::new(root_uri))
523 };
524 let joined = Path::new(&root).join(relative_path);
525 normalize_local_path(&joined)
526 }
527 }
528}
529
530fn local_path_from_uri(uri: &str) -> Result<PathBuf> {
531 if uri.starts_with(FILE_SCHEME_PREFIX) {
532 return local_path_from_file_uri(uri);
533 }
534 Ok(PathBuf::from(uri))
535}
536
537fn absolutize_lexically(path: PathBuf) -> Result<PathBuf> {
543 let joined = if path.is_absolute() {
544 path
545 } else {
546 std::env::current_dir()
547 .map_err(|err| {
548 OmniError::manifest_internal(format!(
549 "cannot resolve relative storage path '{}': {}",
550 path.display(),
551 err
552 ))
553 })?
554 .join(path)
555 };
556 let mut out = PathBuf::new();
557 for component in joined.components() {
558 match component {
559 Component::CurDir => {}
560 Component::ParentDir => {
561 out.pop();
562 }
563 other => out.push(other),
564 }
565 }
566 Ok(out)
567}
568
569fn local_path_from_file_uri(uri: &str) -> Result<PathBuf> {
570 let url = Url::parse(uri).map_err(|err| {
571 OmniError::manifest_internal(format!("invalid file uri '{}': {}", uri, err))
572 })?;
573 url.to_file_path()
574 .map_err(|_| OmniError::manifest_internal(format!("invalid file uri '{}'", uri)))
575}
576
577fn parse_s3_uri(uri: &str) -> Result<S3Location> {
578 let url = Url::parse(uri).map_err(|err| {
579 OmniError::manifest_internal(format!("invalid s3 uri '{}': {}", uri, err))
580 })?;
581 if url.scheme() != "s3" {
582 return Err(OmniError::manifest_internal(format!(
583 "unsupported s3 uri '{}'",
584 uri
585 )));
586 }
587 let bucket = url
588 .host_str()
589 .ok_or_else(|| OmniError::manifest_internal(format!("missing s3 bucket in '{}'", uri)))?;
590 Ok(S3Location {
591 bucket: bucket.to_string(),
592 key: url.path().trim_start_matches('/').to_string(),
593 })
594}
595
596fn storage_backend_error(action: &str, uri: &str, err: impl std::fmt::Display) -> OmniError {
597 OmniError::manifest_internal(format!("storage {} failed for '{}': {}", action, uri, err))
598}
599
600fn normalize_local_path(path: &Path) -> String {
601 let raw = path.as_os_str().to_string_lossy();
602 if raw == "/" {
603 return raw.to_string();
604 }
605 trim_trailing_slashes(&raw)
606}
607
608fn trim_trailing_slashes(value: &str) -> String {
609 let trimmed = value.trim_end_matches('/');
610 if trimmed.is_empty() {
611 value.to_string()
612 } else {
613 trimmed.to_string()
614 }
615}
616
617fn env_var_truthy(key: &str) -> bool {
618 matches!(
619 env::var(key).ok().as_deref(),
620 Some("1" | "true" | "TRUE" | "True" | "yes" | "YES" | "on" | "ON")
621 )
622}
623
624#[cfg(test)]
625mod tests {
626 use super::*;
627
628 async fn contract_suite(adapter: &dyn StorageAdapter, root: &str) {
634 let a = format!("{root}/contract/a.json");
636 adapter.write_text(&a, "v1").await.unwrap();
637 assert_eq!(adapter.read_text(&a).await.unwrap(), "v1");
638 adapter.write_text(&a, "v2").await.unwrap();
639 assert_eq!(adapter.read_text(&a).await.unwrap(), "v2");
640
641 assert!(adapter.exists(&a).await.unwrap());
644 assert!(
645 !adapter
646 .exists(&format!("{root}/contract/missing.json"))
647 .await
648 .unwrap()
649 );
650 assert!(adapter.exists(&format!("{root}/contract")).await.unwrap());
651
652 let claim = format!("{root}/contract/claim.json");
655 assert!(adapter.write_text_if_absent(&claim, "first").await.unwrap());
656 assert!(!adapter.write_text_if_absent(&claim, "second").await.unwrap());
657 assert_eq!(adapter.read_text(&claim).await.unwrap(), "first");
658
659 let state = format!("{root}/contract/state.json");
662 adapter.write_text(&state, "s1").await.unwrap();
663 let (text, v1) = adapter.read_text_versioned(&state).await.unwrap();
664 assert_eq!(text, "s1");
665 let v2 = adapter
666 .write_text_if_match(&state, "s2", &v1)
667 .await
668 .unwrap()
669 .expect("fresh token must win");
670 assert_ne!(v2, v1);
671 assert!(
672 adapter
673 .write_text_if_match(&state, "s3", &v1)
674 .await
675 .unwrap()
676 .is_none()
677 );
678 assert_eq!(adapter.read_text(&state).await.unwrap(), "s2");
679 assert!(
680 adapter
681 .write_text_if_match(&format!("{root}/contract/absent.json"), "x", &v1)
682 .await
683 .unwrap()
684 .is_none()
685 );
686
687 let src = format!("{root}/contract/src.json");
689 adapter.write_text(&src, "moved").await.unwrap();
690 adapter.rename_text(&src, &a).await.unwrap();
691 assert_eq!(adapter.read_text(&a).await.unwrap(), "moved");
692 assert!(!adapter.exists(&src).await.unwrap());
693
694 let dir_uri = format!("{root}/contract/list");
697 adapter
698 .write_text(&format!("{dir_uri}/one.json"), "1")
699 .await
700 .unwrap();
701 adapter
702 .write_text(&format!("{dir_uri}/two.json"), "2")
703 .await
704 .unwrap();
705 adapter
706 .write_text(&format!("{dir_uri}/sub/three.json"), "3")
707 .await
708 .unwrap();
709 adapter
710 .write_text(&format!("{root}/contract/list_log/x.json"), "x")
711 .await
712 .unwrap();
713 let mut listed = adapter.list_dir(&dir_uri).await.unwrap();
714 listed.sort();
715 assert_eq!(
716 listed,
717 vec![
718 format!("{dir_uri}/one.json"),
719 format!("{dir_uri}/two.json")
720 ]
721 );
722 for uri in &listed {
723 adapter.read_text(uri).await.unwrap();
724 }
725 assert!(
726 adapter
727 .list_dir(&format!("{root}/contract/nope"))
728 .await
729 .unwrap()
730 .is_empty()
731 );
732
733 adapter.delete(&claim).await.unwrap();
735 adapter.delete(&claim).await.unwrap();
736 assert!(!adapter.exists(&claim).await.unwrap());
737
738 adapter
741 .delete_prefix(&format!("{root}/contract"))
742 .await
743 .unwrap();
744 assert!(!adapter.exists(&a).await.unwrap());
745 assert!(!adapter.exists(&format!("{root}/contract")).await.unwrap());
746 adapter
747 .delete_prefix(&format!("{root}/contract"))
748 .await
749 .unwrap();
750 }
751
752 #[tokio::test]
753 async fn contract_suite_local() {
754 let dir = tempfile::tempdir().unwrap();
755 let adapter = ObjectStorageAdapter::local();
756 contract_suite(&adapter, dir.path().to_str().unwrap()).await;
757 }
758
759 #[tokio::test]
760 async fn contract_suite_in_memory() {
761 let adapter = ObjectStorageAdapter::in_memory();
764 contract_suite(&adapter, "mem-root").await;
765 }
766
767 #[tokio::test]
777 async fn local_write_text_if_absent_is_read_visible_on_return() {
778 let dir = tempfile::tempdir().unwrap();
779 let adapter = ObjectStorageAdapter::local();
780 let payload = "x".repeat(8 * 1024);
781 for i in 0..1000 {
782 let path = dir.path().join(format!("obj-{i}.json"));
783 let uri = format!("{}", path.display());
784 assert!(adapter.write_text_if_absent(&uri, &payload).await.unwrap());
785 let read = std::fs::read_to_string(&path).unwrap();
786 assert_eq!(
787 read.len(),
788 payload.len(),
789 "iteration {i}: write_text_if_absent returned before its \
790 contents reached the file"
791 );
792 }
793 }
794
795 #[tokio::test(flavor = "multi_thread")]
802 async fn write_text_if_absent_is_read_consistent_immediately() {
803 let dir = tempfile::tempdir().unwrap();
804 let adapter = storage_for_uri(&format!("file://{}", dir.path().display())).unwrap();
805 let payload = "x".repeat(64 * 1024);
806 for i in 0..200 {
807 let uri = format!("file://{}/f{}.json", dir.path().display(), i);
808 assert!(adapter.write_text_if_absent(&uri, &payload).await.unwrap());
809 let read = std::fs::read_to_string(dir.path().join(format!("f{i}.json"))).unwrap();
810 assert_eq!(read.len(), payload.len(), "iteration {i}: short read");
811 }
812 }
813
814 #[tokio::test]
818 async fn local_exists_is_object_semantics_for_directories() {
819 let dir = tempfile::tempdir().unwrap();
820 let probe = dir.path().join("maybe-dataset");
821 let adapter = ObjectStorageAdapter::local();
822 std::fs::create_dir(&probe).unwrap();
823 assert!(
824 !adapter.exists(probe.to_str().unwrap()).await.unwrap(),
825 "an empty directory is not an object"
826 );
827 std::fs::write(probe.join("1.manifest"), "m").unwrap();
828 assert!(
829 adapter.exists(probe.to_str().unwrap()).await.unwrap(),
830 "a non-empty prefix exists (the Lance dataset-root probe shape)"
831 );
832 }
833
834 #[tokio::test]
838 async fn local_list_round_trips_file_scheme_and_spaces() {
839 let dir = tempfile::tempdir().unwrap();
840 let root = dir.path().join("with space");
841 let adapter = ObjectStorageAdapter::local();
842 let plain = format!("{}/x.json", root.display());
843 adapter.write_text(&plain, "x").await.unwrap();
844
845 let listed = adapter.list_dir(root.to_str().unwrap()).await.unwrap();
846 assert_eq!(listed, vec![plain.clone()]);
847 assert_eq!(adapter.read_text(&listed[0]).await.unwrap(), "x");
848
849 let file_anchor = format!("file://{}", root.display());
850 let listed = adapter.list_dir(&file_anchor).await.unwrap();
851 assert_eq!(listed, vec![format!("{file_anchor}/x.json")]);
852 assert_eq!(adapter.read_text(&listed[0]).await.unwrap(), "x");
853 }
854
855 #[tokio::test]
859 async fn local_paths_with_dot_segments_are_absolutized() {
860 let dir = tempfile::tempdir().unwrap();
861 let adapter = ObjectStorageAdapter::local();
862 let uri = format!("{}/sub/../dotted.json", dir.path().display());
863 adapter.write_text(&uri, "x").await.unwrap();
864 assert_eq!(adapter.read_text(&uri).await.unwrap(), "x");
865 assert!(dir.path().join("dotted.json").exists());
866 }
867
868 #[tokio::test]
872 async fn local_rename_creates_missing_destination_parents() {
873 let dir = tempfile::tempdir().unwrap();
874 let adapter = ObjectStorageAdapter::local();
875 let src = format!("{}/src.json", dir.path().display());
876 adapter.write_text(&src, "x").await.unwrap();
877 let dst = format!("{}/new-sub/dst.json", dir.path().display());
878 adapter.rename_text(&src, &dst).await.unwrap();
879 assert_eq!(adapter.read_text(&dst).await.unwrap(), "x");
880 }
881
882 #[test]
883 fn storage_backend_selection_is_scheme_aware() {
884 assert_eq!(storage_kind_for_uri("/tmp/graph"), StorageKind::Local);
885 assert_eq!(
886 storage_kind_for_uri("file:///tmp/graph"),
887 StorageKind::Local
888 );
889 assert_eq!(
890 storage_kind_for_uri("s3://omnigraph-preview/graph"),
891 StorageKind::S3
892 );
893 }
894
895 #[test]
896 fn normalize_root_uri_preserves_local_and_s3_shapes() {
897 assert_eq!(
898 normalize_root_uri("/tmp/omnigraph/").unwrap(),
899 "/tmp/omnigraph"
900 );
901 assert_eq!(
902 normalize_root_uri("file:///tmp/omnigraph/").unwrap(),
903 "/tmp/omnigraph"
904 );
905 assert_eq!(
906 normalize_root_uri("s3://bucket/prefix/").unwrap(),
907 "s3://bucket/prefix"
908 );
909 }
910
911 #[test]
912 fn join_uri_handles_local_file_and_s3_roots() {
913 assert_eq!(
914 join_uri("/tmp/omnigraph", "_schema.pg"),
915 "/tmp/omnigraph/_schema.pg"
916 );
917 assert_eq!(
918 join_uri("file:///tmp/omnigraph", "_schema.pg"),
919 "/tmp/omnigraph/_schema.pg"
920 );
921 assert_eq!(
922 join_uri("s3://bucket/prefix", "_schema.pg"),
923 "s3://bucket/prefix/_schema.pg"
924 );
925 }
926
927 #[test]
928 fn parse_s3_uri_splits_bucket_and_key() {
929 let location = parse_s3_uri("s3://bucket/graph/_schema.pg").unwrap();
930 assert_eq!(location.bucket, "bucket");
931 assert_eq!(location.key, "graph/_schema.pg");
932 }
933
934}