zarr_datafusion/reader/
storage.rs

1//! Storage backend factory for local and cloud object stores.
2//!
3//! Supports:
4//! - Local filesystem: plain paths or `file:///path`
5//! - AWS S3: `s3://bucket/path`
6//! - Google Cloud Storage: `gs://bucket/path`
7
8use std::sync::Arc;
9use tracing::{debug, info, instrument};
10use url::Url;
11use zarrs::storage::AsyncReadableListableStorage;
12use zarrs_object_store::object_store::aws::AmazonS3Builder;
13use zarrs_object_store::object_store::gcp::GoogleCloudStorageBuilder;
14use zarrs_object_store::object_store::path::Path as ObjectPath;
15use zarrs_object_store::object_store::Error as ObjectStoreError;
16use zarrs_object_store::AsyncObjectStore;
17
18/// Error type for storage operations
19#[derive(Debug)]
20pub enum StorageError {
21    InvalidUrl(String),
22    ObjectStore(ObjectStoreError),
23    Filesystem(std::io::Error),
24}
25
26impl std::fmt::Display for StorageError {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        match self {
29            StorageError::InvalidUrl(msg) => write!(f, "Invalid URL: {}", msg),
30            StorageError::ObjectStore(e) => write!(f, "Object store error: {}", e),
31            StorageError::Filesystem(e) => write!(f, "Filesystem error: {}", e),
32        }
33    }
34}
35
36impl std::error::Error for StorageError {}
37
38impl From<ObjectStoreError> for StorageError {
39    fn from(e: ObjectStoreError) -> Self {
40        StorageError::ObjectStore(e)
41    }
42}
43
44impl From<std::io::Error> for StorageError {
45    fn from(e: std::io::Error) -> Self {
46        StorageError::Filesystem(e)
47    }
48}
49
50/// Storage location with backend type and path information
51#[derive(Debug, Clone)]
52pub struct StorageLocation {
53    /// The original URL/path string
54    pub url: String,
55    /// Path within the store (for object stores, this is the prefix)
56    pub path: String,
57    /// Whether this is a remote (cloud) store
58    pub is_remote: bool,
59}
60
61impl StorageLocation {
62    /// Parse a URL or path into a StorageLocation
63    pub fn parse(location: &str) -> Result<Self, StorageError> {
64        if location.starts_with("s3://") || location.starts_with("gs://") {
65            // Cloud URL - extract path from URL
66            let url = Url::parse(location).map_err(|e| StorageError::InvalidUrl(e.to_string()))?;
67            let path = url.path().trim_start_matches('/').to_string();
68            Ok(StorageLocation {
69                url: location.to_string(),
70                path,
71                is_remote: true,
72            })
73        } else if location.starts_with("file://") {
74            // Local file URL
75            let path = location.trim_start_matches("file://").to_string();
76            Ok(StorageLocation {
77                url: location.to_string(),
78                path,
79                is_remote: false,
80            })
81        } else {
82            // Plain local path
83            Ok(StorageLocation {
84                url: location.to_string(),
85                path: location.to_string(),
86                is_remote: false,
87            })
88        }
89    }
90}
91
92/// Create an async storage backend from a URL or path.
93///
94/// # Supported URL schemes
95///
96/// - `s3://bucket/path` - AWS S3 (credentials from environment)
97/// - `gs://bucket/path` - Google Cloud Storage (credentials from environment or anonymous)
98/// - `file:///path` or plain path - Local filesystem
99///
100/// # Environment Variables
101///
102/// ## S3
103/// - `AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY` - Credentials
104/// - `AWS_DEFAULT_REGION` - Region
105/// - `AWS_ENDPOINT` - Custom endpoint (for S3-compatible services)
106///
107/// ## GCS
108/// - `GOOGLE_SERVICE_ACCOUNT` - Path to service account JSON
109/// - Or uses Application Default Credentials
110/// - Falls back to anonymous access for public buckets
111#[instrument(level = "debug")]
112pub async fn create_async_store(
113    location: &str,
114) -> Result<(AsyncReadableListableStorage, ObjectPath), StorageError> {
115    debug!("Parsing storage location");
116    let _loc = StorageLocation::parse(location)?;
117    debug!(is_remote = _loc.is_remote, path = %_loc.path, "Location parsed");
118
119    if location.starts_with("s3://") {
120        info!("Creating S3 store");
121        create_s3_store(location).await
122    } else if location.starts_with("gs://") {
123        info!("Creating GCS store");
124        create_gcs_store(location).await
125    } else {
126        debug!("Local filesystem - not supported for async");
127        Err(StorageError::InvalidUrl(
128            "Local filesystem should use synchronous FilesystemStore".to_string(),
129        ))
130    }
131}
132
133/// Create an S3 storage backend
134async fn create_s3_store(
135    url: &str,
136) -> Result<(AsyncReadableListableStorage, ObjectPath), StorageError> {
137    let parsed = Url::parse(url).map_err(|e| StorageError::InvalidUrl(e.to_string()))?;
138    let bucket = parsed
139        .host_str()
140        .ok_or_else(|| StorageError::InvalidUrl("Missing bucket in S3 URL".to_string()))?;
141    let path = parsed.path().trim_start_matches('/');
142
143    let store = AmazonS3Builder::from_env()
144        .with_bucket_name(bucket)
145        .build()?;
146
147    let async_store: AsyncReadableListableStorage = Arc::new(AsyncObjectStore::new(store));
148    let object_path = ObjectPath::from(path);
149
150    Ok((async_store, object_path))
151}
152
153/// Create a GCS storage backend
154#[instrument(level = "debug")]
155async fn create_gcs_store(
156    url: &str,
157) -> Result<(AsyncReadableListableStorage, ObjectPath), StorageError> {
158    let parsed = Url::parse(url).map_err(|e| StorageError::InvalidUrl(e.to_string()))?;
159    let bucket = parsed
160        .host_str()
161        .ok_or_else(|| StorageError::InvalidUrl("Missing bucket in GCS URL".to_string()))?;
162    let path = parsed.path().trim_start_matches('/');
163    debug!(bucket = bucket, path = path, "Parsed GCS URL");
164
165    // For public buckets, try anonymous access first (faster)
166    // Then fall back to credentials from environment
167    debug!("Trying anonymous access for public bucket");
168    let store = GoogleCloudStorageBuilder::new()
169        .with_bucket_name(bucket)
170        .with_skip_signature(true)
171        .build()
172        .or_else(|_| {
173            debug!("Anonymous access failed, trying credentials from environment");
174            GoogleCloudStorageBuilder::from_env()
175                .with_bucket_name(bucket)
176                .build()
177        })?;
178
179    let async_store: AsyncReadableListableStorage = Arc::new(AsyncObjectStore::new(store));
180    let object_path = ObjectPath::from(path);
181    info!(
182        bucket = bucket,
183        path = path,
184        "GCS store created successfully"
185    );
186
187    Ok((async_store, object_path))
188}
189
190/// Check if a location is a remote (cloud) URL
191pub fn is_remote_url(location: &str) -> bool {
192    location.starts_with("s3://") || location.starts_with("gs://")
193}
194
195#[cfg(test)]
196mod tests {
197    use super::*;
198
199    #[test]
200    fn test_parse_s3_url() {
201        let loc = StorageLocation::parse("s3://my-bucket/path/to/data.zarr").unwrap();
202        assert!(loc.is_remote);
203        assert_eq!(loc.path, "path/to/data.zarr");
204    }
205
206    #[test]
207    fn test_parse_gcs_url() {
208        let loc = StorageLocation::parse("gs://my-bucket/path/to/data.zarr").unwrap();
209        assert!(loc.is_remote);
210        assert_eq!(loc.path, "path/to/data.zarr");
211    }
212
213    #[test]
214    fn test_parse_local_path() {
215        let loc = StorageLocation::parse("/data/synthetic.zarr").unwrap();
216        assert!(!loc.is_remote);
217        assert_eq!(loc.path, "/data/synthetic.zarr");
218    }
219
220    #[test]
221    fn test_parse_file_url() {
222        let loc = StorageLocation::parse("file:///data/synthetic.zarr").unwrap();
223        assert!(!loc.is_remote);
224        assert_eq!(loc.path, "/data/synthetic.zarr");
225    }
226
227    #[test]
228    fn test_is_remote_url() {
229        assert!(is_remote_url("s3://bucket/path"));
230        assert!(is_remote_url("gs://bucket/path"));
231        assert!(!is_remote_url("/local/path"));
232        assert!(!is_remote_url("file:///local/path"));
233    }
234}