Skip to main content

uni_store/
cloud.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Cloud storage builders for S3, GCS, and Azure.
5//!
6//! This module provides utilities to construct `ObjectStore` instances
7//! for various cloud providers. It supports both explicit configuration
8//! and URL-based store creation.
9//!
10//! # Examples
11//!
12//! ```ignore
13//! use uni_common::CloudStorageConfig;
14//! use uni_store::cloud::build_cloud_store;
15//!
16//! let config = CloudStorageConfig::s3_from_env("my-bucket");
17//! let store = build_cloud_store(&config)?;
18//! ```
19
20use std::sync::Arc;
21
22use anyhow::{Result, anyhow};
23use object_store::ObjectStore;
24use object_store::ObjectStoreExt;
25use object_store::aws::AmazonS3Builder;
26use object_store::azure::MicrosoftAzureBuilder;
27use object_store::gcp::GoogleCloudStorageBuilder;
28use object_store::local::LocalFileSystem;
29use object_store::path::Path;
30use uni_common::CloudStorageConfig;
31
32/// Builds an `ObjectStore` from a `CloudStorageConfig`.
33///
34/// Creates the appropriate cloud storage client based on the configuration
35/// variant, applying all specified credentials and settings.
36///
37/// # Errors
38///
39/// Returns an error if the cloud store cannot be built due to invalid
40/// credentials, missing required parameters, or network issues.
41///
42/// # Examples
43///
44/// ```ignore
45/// let config = CloudStorageConfig::S3 {
46///     bucket: "my-bucket".to_string(),
47///     region: Some("us-east-1".to_string()),
48///     endpoint: None,
49///     access_key_id: None,
50///     secret_access_key: None,
51///     session_token: None,
52///     virtual_hosted_style: false,
53/// };
54/// let store = build_cloud_store(&config)?;
55/// ```
56pub fn build_cloud_store(config: &CloudStorageConfig) -> Result<Arc<dyn ObjectStore>> {
57    match config {
58        CloudStorageConfig::S3 {
59            bucket,
60            region,
61            endpoint,
62            access_key_id,
63            secret_access_key,
64            session_token,
65            virtual_hosted_style,
66        } => {
67            let mut builder = AmazonS3Builder::new().with_bucket_name(bucket);
68
69            if let Some(r) = region {
70                builder = builder.with_region(r);
71            }
72
73            if let Some(e) = endpoint {
74                builder = builder.with_endpoint(e);
75            }
76
77            if let Some(key) = access_key_id {
78                builder = builder.with_access_key_id(key);
79            }
80
81            if let Some(secret) = secret_access_key {
82                builder = builder.with_secret_access_key(secret);
83            }
84
85            if let Some(token) = session_token {
86                builder = builder.with_token(token);
87            }
88
89            // Path-style (false) is required for LocalStack and MinIO
90            builder = builder.with_virtual_hosted_style_request(*virtual_hosted_style);
91
92            // Allow HTTP for local development (LocalStack, MinIO)
93            if endpoint.as_ref().is_some_and(|e| e.starts_with("http://")) {
94                builder = builder.with_allow_http(true);
95            }
96
97            let store = builder.build()?;
98            Ok(Arc::new(store))
99        }
100        CloudStorageConfig::Gcs {
101            bucket,
102            service_account_path,
103            service_account_key,
104        } => {
105            let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(bucket);
106
107            if let Some(path) = service_account_path {
108                builder = builder.with_service_account_path(path);
109            }
110
111            if let Some(key) = service_account_key {
112                builder = builder.with_service_account_key(key);
113            }
114
115            let store = builder.build()?;
116            Ok(Arc::new(store))
117        }
118        CloudStorageConfig::Azure {
119            container,
120            account,
121            access_key,
122            sas_token,
123        } => {
124            let mut builder = MicrosoftAzureBuilder::new()
125                .with_container_name(container)
126                .with_account(account);
127
128            if let Some(key) = access_key {
129                builder = builder.with_access_key(key);
130            }
131
132            if let Some(token) = sas_token {
133                // Parse SAS token query string into key-value pairs
134                // SAS tokens look like: sv=2019-02-02&ss=b&srt=sco&...
135                let query_pairs: Vec<(String, String)> = token
136                    .trim_start_matches('?')
137                    .split('&')
138                    .filter_map(|pair| {
139                        let mut parts = pair.splitn(2, '=');
140                        let key = parts.next()?;
141                        let value = parts.next().unwrap_or("");
142                        Some((key.to_string(), value.to_string()))
143                    })
144                    .collect();
145                builder = builder.with_sas_authorization(query_pairs);
146            }
147
148            let store = builder.build()?;
149            Ok(Arc::new(store))
150        }
151    }
152}
153
154/// Parses a URL and returns the appropriate `ObjectStore` and path prefix.
155///
156/// Supports the following URL schemes:
157/// - `s3://bucket/path` - Amazon S3
158/// - `gs://bucket/path` - Google Cloud Storage
159/// - `az://account/container/path` - Azure Blob Storage
160/// - `file:///path` or `/path` - Local filesystem
161///
162/// # Errors
163///
164/// Returns an error if the URL scheme is not recognized or if the store
165/// cannot be created.
166///
167/// # Examples
168///
169/// ```ignore
170/// let (store, prefix) = build_store_from_url("s3://my-bucket/data")?;
171/// ```
172pub fn build_store_from_url(url: &str) -> Result<(Arc<dyn ObjectStore>, Path)> {
173    // Handle local paths directly
174    if !url.contains("://") {
175        let local_path = std::path::Path::new(url);
176        let store = Arc::new(LocalFileSystem::new_with_prefix(local_path)?);
177        return Ok((store, Path::from("")));
178    }
179
180    if let Some(path) = url.strip_prefix("file://") {
181        let local_path = std::path::Path::new(path);
182        let store = Arc::new(LocalFileSystem::new_with_prefix(local_path)?);
183        return Ok((store, Path::from("")));
184    }
185
186    let parsed = url::Url::parse(url).map_err(|e| anyhow!("Invalid URL '{}': {}", url, e))?;
187    let scheme = parsed.scheme();
188
189    match scheme {
190        "s3" => {
191            let bucket = parsed
192                .host_str()
193                .ok_or_else(|| anyhow!("S3 URL missing bucket: {}", url))?;
194            let prefix = parsed.path().trim_start_matches('/');
195
196            let config = CloudStorageConfig::s3_from_env(bucket);
197            let store = build_cloud_store(&config)?;
198
199            Ok((store, Path::from(prefix)))
200        }
201        "gs" => {
202            let bucket = parsed
203                .host_str()
204                .ok_or_else(|| anyhow!("GCS URL missing bucket: {}", url))?;
205            let prefix = parsed.path().trim_start_matches('/');
206
207            let config = CloudStorageConfig::gcs_from_env(bucket);
208            let store = build_cloud_store(&config)?;
209
210            Ok((store, Path::from(prefix)))
211        }
212        "az" | "azure" | "abfs" | "abfss" => {
213            // Format: az://account/container/path
214            let account = parsed
215                .host_str()
216                .ok_or_else(|| anyhow!("Azure URL missing account: {}", url))?;
217            let path_parts: Vec<&str> = parsed
218                .path()
219                .trim_start_matches('/')
220                .splitn(2, '/')
221                .collect();
222
223            if path_parts.is_empty() || path_parts[0].is_empty() {
224                return Err(anyhow!("Azure URL missing container: {}", url));
225            }
226
227            let container = path_parts[0];
228            let prefix = path_parts.get(1).unwrap_or(&"");
229
230            let config = CloudStorageConfig::Azure {
231                container: container.to_string(),
232                account: account.to_string(),
233                access_key: std::env::var("AZURE_STORAGE_ACCESS_KEY").ok(),
234                sas_token: std::env::var("AZURE_STORAGE_SAS_TOKEN").ok(),
235            };
236
237            let store = build_cloud_store(&config)?;
238
239            Ok((store, Path::from(*prefix)))
240        }
241        _ => Err(anyhow!(
242            "Unsupported URL scheme '{}'. Supported: s3://, gs://, az://",
243            scheme
244        )),
245    }
246}
247
248/// Copies all objects from a source prefix to a destination prefix.
249///
250/// This function streams objects from the source store and uploads them
251/// to the destination store, preserving the relative path structure.
252///
253/// # Errors
254///
255/// Returns an error if any object cannot be read or written.
256pub async fn copy_store_prefix(
257    src_store: &Arc<dyn ObjectStore>,
258    dst_store: &Arc<dyn ObjectStore>,
259    src_prefix: &Path,
260    dst_prefix: &Path,
261) -> Result<usize> {
262    use futures::TryStreamExt;
263
264    let mut copied_count = 0usize;
265    let mut stream = src_store.list(Some(src_prefix));
266
267    while let Some(meta) = stream.try_next().await? {
268        let bytes = src_store.get(&meta.location).await?.bytes().await?;
269
270        // Calculate relative path from source prefix
271        let relative = meta
272            .location
273            .as_ref()
274            .strip_prefix(src_prefix.as_ref())
275            .unwrap_or(meta.location.as_ref())
276            .trim_start_matches('/');
277
278        let dst_path = if dst_prefix.as_ref().is_empty() {
279            Path::from(relative)
280        } else {
281            Path::from(format!("{}/{}", dst_prefix.as_ref(), relative))
282        };
283
284        dst_store.put(&dst_path, bytes.into()).await?;
285        copied_count += 1;
286    }
287
288    Ok(copied_count)
289}
290
291/// Checks if a URL points to a cloud storage location.
292#[must_use]
293pub fn is_cloud_url(url: &str) -> bool {
294    url.starts_with("s3://")
295        || url.starts_with("gs://")
296        || url.starts_with("az://")
297        || url.starts_with("azure://")
298        || url.starts_with("abfs://")
299        || url.starts_with("abfss://")
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305
306    #[test]
307    fn test_is_cloud_url() {
308        assert!(is_cloud_url("s3://bucket/path"));
309        assert!(is_cloud_url("gs://bucket/path"));
310        assert!(is_cloud_url("az://account/container"));
311        assert!(is_cloud_url("azure://account/container"));
312        assert!(!is_cloud_url("/local/path"));
313        assert!(!is_cloud_url("file:///local/path"));
314        assert!(!is_cloud_url("./relative/path"));
315    }
316
317    #[test]
318    fn test_cloud_storage_config_bucket_name() {
319        let s3 = CloudStorageConfig::S3 {
320            bucket: "my-s3-bucket".to_string(),
321            region: None,
322            endpoint: None,
323            access_key_id: None,
324            secret_access_key: None,
325            session_token: None,
326            virtual_hosted_style: false,
327        };
328        assert_eq!(s3.bucket_name(), "my-s3-bucket");
329
330        let gcs = CloudStorageConfig::Gcs {
331            bucket: "my-gcs-bucket".to_string(),
332            service_account_path: None,
333            service_account_key: None,
334        };
335        assert_eq!(gcs.bucket_name(), "my-gcs-bucket");
336
337        let azure = CloudStorageConfig::Azure {
338            container: "my-container".to_string(),
339            account: "myaccount".to_string(),
340            access_key: None,
341            sas_token: None,
342        };
343        assert_eq!(azure.bucket_name(), "my-container");
344    }
345
346    #[test]
347    fn test_cloud_storage_config_to_url() {
348        let s3 = CloudStorageConfig::S3 {
349            bucket: "bucket".to_string(),
350            region: None,
351            endpoint: None,
352            access_key_id: None,
353            secret_access_key: None,
354            session_token: None,
355            virtual_hosted_style: false,
356        };
357        assert_eq!(s3.to_url(), "s3://bucket");
358
359        let gcs = CloudStorageConfig::Gcs {
360            bucket: "bucket".to_string(),
361            service_account_path: None,
362            service_account_key: None,
363        };
364        assert_eq!(gcs.to_url(), "gs://bucket");
365
366        let azure = CloudStorageConfig::Azure {
367            container: "container".to_string(),
368            account: "account".to_string(),
369            access_key: None,
370            sas_token: None,
371        };
372        assert_eq!(azure.to_url(), "az://account/container");
373    }
374
375    #[tokio::test]
376    async fn test_build_store_from_local_url() {
377        let temp_dir = tempfile::tempdir().unwrap();
378        let path = temp_dir.path().to_str().unwrap();
379
380        let result = build_store_from_url(path);
381        assert!(result.is_ok());
382
383        let (store, prefix) = result.unwrap();
384        assert!(prefix.as_ref().is_empty());
385
386        // Verify the store works
387        let test_path = Path::from("test.txt");
388        store
389            .put(&test_path, bytes::Bytes::from("hello").into())
390            .await
391            .unwrap();
392
393        let data = store.get(&test_path).await.unwrap().bytes().await.unwrap();
394        assert_eq!(data.as_ref(), b"hello");
395    }
396}