zarr_datafusion/reader/
storage.rs1use std::sync::Arc;
9use tracing::{debug, info, instrument};
10use url::Url;
11use zarrs::storage::AsyncReadableListableStorage;
12use zarrs_object_store::object_store::aws::AmazonS3Builder;
13use zarrs_object_store::object_store::gcp::GoogleCloudStorageBuilder;
14use zarrs_object_store::object_store::path::Path as ObjectPath;
15use zarrs_object_store::object_store::Error as ObjectStoreError;
16use zarrs_object_store::AsyncObjectStore;
17
18#[derive(Debug)]
20pub enum StorageError {
21 InvalidUrl(String),
22 ObjectStore(ObjectStoreError),
23 Filesystem(std::io::Error),
24}
25
26impl std::fmt::Display for StorageError {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 match self {
29 StorageError::InvalidUrl(msg) => write!(f, "Invalid URL: {}", msg),
30 StorageError::ObjectStore(e) => write!(f, "Object store error: {}", e),
31 StorageError::Filesystem(e) => write!(f, "Filesystem error: {}", e),
32 }
33 }
34}
35
36impl std::error::Error for StorageError {}
37
38impl From<ObjectStoreError> for StorageError {
39 fn from(e: ObjectStoreError) -> Self {
40 StorageError::ObjectStore(e)
41 }
42}
43
44impl From<std::io::Error> for StorageError {
45 fn from(e: std::io::Error) -> Self {
46 StorageError::Filesystem(e)
47 }
48}
49
50#[derive(Debug, Clone)]
52pub struct StorageLocation {
53 pub url: String,
55 pub path: String,
57 pub is_remote: bool,
59}
60
61impl StorageLocation {
62 pub fn parse(location: &str) -> Result<Self, StorageError> {
64 if location.starts_with("s3://") || location.starts_with("gs://") {
65 let url = Url::parse(location).map_err(|e| StorageError::InvalidUrl(e.to_string()))?;
67 let path = url.path().trim_start_matches('/').to_string();
68 Ok(StorageLocation {
69 url: location.to_string(),
70 path,
71 is_remote: true,
72 })
73 } else if location.starts_with("file://") {
74 let path = location.trim_start_matches("file://").to_string();
76 Ok(StorageLocation {
77 url: location.to_string(),
78 path,
79 is_remote: false,
80 })
81 } else {
82 Ok(StorageLocation {
84 url: location.to_string(),
85 path: location.to_string(),
86 is_remote: false,
87 })
88 }
89 }
90}
91
92#[instrument(level = "debug")]
112pub async fn create_async_store(
113 location: &str,
114) -> Result<(AsyncReadableListableStorage, ObjectPath), StorageError> {
115 debug!("Parsing storage location");
116 let _loc = StorageLocation::parse(location)?;
117 debug!(is_remote = _loc.is_remote, path = %_loc.path, "Location parsed");
118
119 if location.starts_with("s3://") {
120 info!("Creating S3 store");
121 create_s3_store(location).await
122 } else if location.starts_with("gs://") {
123 info!("Creating GCS store");
124 create_gcs_store(location).await
125 } else {
126 debug!("Local filesystem - not supported for async");
127 Err(StorageError::InvalidUrl(
128 "Local filesystem should use synchronous FilesystemStore".to_string(),
129 ))
130 }
131}
132
133async fn create_s3_store(
135 url: &str,
136) -> Result<(AsyncReadableListableStorage, ObjectPath), StorageError> {
137 let parsed = Url::parse(url).map_err(|e| StorageError::InvalidUrl(e.to_string()))?;
138 let bucket = parsed
139 .host_str()
140 .ok_or_else(|| StorageError::InvalidUrl("Missing bucket in S3 URL".to_string()))?;
141 let path = parsed.path().trim_start_matches('/');
142
143 let store = AmazonS3Builder::from_env()
144 .with_bucket_name(bucket)
145 .build()?;
146
147 let async_store: AsyncReadableListableStorage = Arc::new(AsyncObjectStore::new(store));
148 let object_path = ObjectPath::from(path);
149
150 Ok((async_store, object_path))
151}
152
153#[instrument(level = "debug")]
155async fn create_gcs_store(
156 url: &str,
157) -> Result<(AsyncReadableListableStorage, ObjectPath), StorageError> {
158 let parsed = Url::parse(url).map_err(|e| StorageError::InvalidUrl(e.to_string()))?;
159 let bucket = parsed
160 .host_str()
161 .ok_or_else(|| StorageError::InvalidUrl("Missing bucket in GCS URL".to_string()))?;
162 let path = parsed.path().trim_start_matches('/');
163 debug!(bucket = bucket, path = path, "Parsed GCS URL");
164
165 debug!("Trying anonymous access for public bucket");
168 let store = GoogleCloudStorageBuilder::new()
169 .with_bucket_name(bucket)
170 .with_skip_signature(true)
171 .build()
172 .or_else(|_| {
173 debug!("Anonymous access failed, trying credentials from environment");
174 GoogleCloudStorageBuilder::from_env()
175 .with_bucket_name(bucket)
176 .build()
177 })?;
178
179 let async_store: AsyncReadableListableStorage = Arc::new(AsyncObjectStore::new(store));
180 let object_path = ObjectPath::from(path);
181 info!(
182 bucket = bucket,
183 path = path,
184 "GCS store created successfully"
185 );
186
187 Ok((async_store, object_path))
188}
189
190pub fn is_remote_url(location: &str) -> bool {
192 location.starts_with("s3://") || location.starts_with("gs://")
193}
194
195#[cfg(test)]
196mod tests {
197 use super::*;
198
199 #[test]
200 fn test_parse_s3_url() {
201 let loc = StorageLocation::parse("s3://my-bucket/path/to/data.zarr").unwrap();
202 assert!(loc.is_remote);
203 assert_eq!(loc.path, "path/to/data.zarr");
204 }
205
206 #[test]
207 fn test_parse_gcs_url() {
208 let loc = StorageLocation::parse("gs://my-bucket/path/to/data.zarr").unwrap();
209 assert!(loc.is_remote);
210 assert_eq!(loc.path, "path/to/data.zarr");
211 }
212
213 #[test]
214 fn test_parse_local_path() {
215 let loc = StorageLocation::parse("/data/synthetic.zarr").unwrap();
216 assert!(!loc.is_remote);
217 assert_eq!(loc.path, "/data/synthetic.zarr");
218 }
219
220 #[test]
221 fn test_parse_file_url() {
222 let loc = StorageLocation::parse("file:///data/synthetic.zarr").unwrap();
223 assert!(!loc.is_remote);
224 assert_eq!(loc.path, "/data/synthetic.zarr");
225 }
226
227 #[test]
228 fn test_is_remote_url() {
229 assert!(is_remote_url("s3://bucket/path"));
230 assert!(is_remote_url("gs://bucket/path"));
231 assert!(!is_remote_url("/local/path"));
232 assert!(!is_remote_url("file:///local/path"));
233 }
234}