1use std::sync::Arc;
21
22use anyhow::{Result, anyhow};
23use object_store::ObjectStore;
24use object_store::ObjectStoreExt;
25use object_store::aws::AmazonS3Builder;
26use object_store::azure::MicrosoftAzureBuilder;
27use object_store::gcp::GoogleCloudStorageBuilder;
28use object_store::local::LocalFileSystem;
29use object_store::path::Path;
30use uni_common::CloudStorageConfig;
31
32pub fn build_cloud_store(config: &CloudStorageConfig) -> Result<Arc<dyn ObjectStore>> {
57 match config {
58 CloudStorageConfig::S3 {
59 bucket,
60 region,
61 endpoint,
62 access_key_id,
63 secret_access_key,
64 session_token,
65 virtual_hosted_style,
66 } => {
67 let mut builder = AmazonS3Builder::new().with_bucket_name(bucket);
68
69 if let Some(r) = region {
70 builder = builder.with_region(r);
71 }
72
73 if let Some(e) = endpoint {
74 builder = builder.with_endpoint(e);
75 }
76
77 if let Some(key) = access_key_id {
78 builder = builder.with_access_key_id(key);
79 }
80
81 if let Some(secret) = secret_access_key {
82 builder = builder.with_secret_access_key(secret);
83 }
84
85 if let Some(token) = session_token {
86 builder = builder.with_token(token);
87 }
88
89 builder = builder.with_virtual_hosted_style_request(*virtual_hosted_style);
91
92 if endpoint.as_ref().is_some_and(|e| e.starts_with("http://")) {
94 builder = builder.with_allow_http(true);
95 }
96
97 let store = builder.build()?;
98 Ok(Arc::new(store))
99 }
100 CloudStorageConfig::Gcs {
101 bucket,
102 service_account_path,
103 service_account_key,
104 } => {
105 let mut builder = GoogleCloudStorageBuilder::new().with_bucket_name(bucket);
106
107 if let Some(path) = service_account_path {
108 builder = builder.with_service_account_path(path);
109 }
110
111 if let Some(key) = service_account_key {
112 builder = builder.with_service_account_key(key);
113 }
114
115 let store = builder.build()?;
116 Ok(Arc::new(store))
117 }
118 CloudStorageConfig::Azure {
119 container,
120 account,
121 access_key,
122 sas_token,
123 } => {
124 let mut builder = MicrosoftAzureBuilder::new()
125 .with_container_name(container)
126 .with_account(account);
127
128 if let Some(key) = access_key {
129 builder = builder.with_access_key(key);
130 }
131
132 if let Some(token) = sas_token {
133 let query_pairs: Vec<(String, String)> = token
136 .trim_start_matches('?')
137 .split('&')
138 .filter_map(|pair| {
139 let mut parts = pair.splitn(2, '=');
140 let key = parts.next()?;
141 let value = parts.next().unwrap_or("");
142 Some((key.to_string(), value.to_string()))
143 })
144 .collect();
145 builder = builder.with_sas_authorization(query_pairs);
146 }
147
148 let store = builder.build()?;
149 Ok(Arc::new(store))
150 }
151 }
152}
153
154pub fn build_store_from_url(url: &str) -> Result<(Arc<dyn ObjectStore>, Path)> {
173 if !url.contains("://") {
175 let local_path = std::path::Path::new(url);
176 let store = Arc::new(LocalFileSystem::new_with_prefix(local_path)?);
177 return Ok((store, Path::from("")));
178 }
179
180 if let Some(path) = url.strip_prefix("file://") {
181 let local_path = std::path::Path::new(path);
182 let store = Arc::new(LocalFileSystem::new_with_prefix(local_path)?);
183 return Ok((store, Path::from("")));
184 }
185
186 let parsed = url::Url::parse(url).map_err(|e| anyhow!("Invalid URL '{}': {}", url, e))?;
187 let scheme = parsed.scheme();
188
189 match scheme {
190 "s3" => {
191 let bucket = parsed
192 .host_str()
193 .ok_or_else(|| anyhow!("S3 URL missing bucket: {}", url))?;
194 let prefix = parsed.path().trim_start_matches('/');
195
196 let config = CloudStorageConfig::s3_from_env(bucket);
197 let store = build_cloud_store(&config)?;
198
199 Ok((store, Path::from(prefix)))
200 }
201 "gs" => {
202 let bucket = parsed
203 .host_str()
204 .ok_or_else(|| anyhow!("GCS URL missing bucket: {}", url))?;
205 let prefix = parsed.path().trim_start_matches('/');
206
207 let config = CloudStorageConfig::gcs_from_env(bucket);
208 let store = build_cloud_store(&config)?;
209
210 Ok((store, Path::from(prefix)))
211 }
212 "az" | "azure" | "abfs" | "abfss" => {
213 let account = parsed
215 .host_str()
216 .ok_or_else(|| anyhow!("Azure URL missing account: {}", url))?;
217 let path_parts: Vec<&str> = parsed
218 .path()
219 .trim_start_matches('/')
220 .splitn(2, '/')
221 .collect();
222
223 if path_parts.is_empty() || path_parts[0].is_empty() {
224 return Err(anyhow!("Azure URL missing container: {}", url));
225 }
226
227 let container = path_parts[0];
228 let prefix = path_parts.get(1).unwrap_or(&"");
229
230 let config = CloudStorageConfig::Azure {
231 container: container.to_string(),
232 account: account.to_string(),
233 access_key: std::env::var("AZURE_STORAGE_ACCESS_KEY").ok(),
234 sas_token: std::env::var("AZURE_STORAGE_SAS_TOKEN").ok(),
235 };
236
237 let store = build_cloud_store(&config)?;
238
239 Ok((store, Path::from(*prefix)))
240 }
241 _ => Err(anyhow!(
242 "Unsupported URL scheme '{}'. Supported: s3://, gs://, az://",
243 scheme
244 )),
245 }
246}
247
248pub async fn copy_store_prefix(
257 src_store: &Arc<dyn ObjectStore>,
258 dst_store: &Arc<dyn ObjectStore>,
259 src_prefix: &Path,
260 dst_prefix: &Path,
261) -> Result<usize> {
262 use futures::TryStreamExt;
263
264 let mut copied_count = 0usize;
265 let mut stream = src_store.list(Some(src_prefix));
266
267 while let Some(meta) = stream.try_next().await? {
268 let bytes = src_store.get(&meta.location).await?.bytes().await?;
269
270 let relative = meta
272 .location
273 .as_ref()
274 .strip_prefix(src_prefix.as_ref())
275 .unwrap_or(meta.location.as_ref())
276 .trim_start_matches('/');
277
278 let dst_path = if dst_prefix.as_ref().is_empty() {
279 Path::from(relative)
280 } else {
281 Path::from(format!("{}/{}", dst_prefix.as_ref(), relative))
282 };
283
284 dst_store.put(&dst_path, bytes.into()).await?;
285 copied_count += 1;
286 }
287
288 Ok(copied_count)
289}
290
291#[must_use]
293pub fn is_cloud_url(url: &str) -> bool {
294 url.starts_with("s3://")
295 || url.starts_with("gs://")
296 || url.starts_with("az://")
297 || url.starts_with("azure://")
298 || url.starts_with("abfs://")
299 || url.starts_with("abfss://")
300}
301
302#[cfg(test)]
303mod tests {
304 use super::*;
305
306 #[test]
307 fn test_is_cloud_url() {
308 assert!(is_cloud_url("s3://bucket/path"));
309 assert!(is_cloud_url("gs://bucket/path"));
310 assert!(is_cloud_url("az://account/container"));
311 assert!(is_cloud_url("azure://account/container"));
312 assert!(!is_cloud_url("/local/path"));
313 assert!(!is_cloud_url("file:///local/path"));
314 assert!(!is_cloud_url("./relative/path"));
315 }
316
317 #[test]
318 fn test_cloud_storage_config_bucket_name() {
319 let s3 = CloudStorageConfig::S3 {
320 bucket: "my-s3-bucket".to_string(),
321 region: None,
322 endpoint: None,
323 access_key_id: None,
324 secret_access_key: None,
325 session_token: None,
326 virtual_hosted_style: false,
327 };
328 assert_eq!(s3.bucket_name(), "my-s3-bucket");
329
330 let gcs = CloudStorageConfig::Gcs {
331 bucket: "my-gcs-bucket".to_string(),
332 service_account_path: None,
333 service_account_key: None,
334 };
335 assert_eq!(gcs.bucket_name(), "my-gcs-bucket");
336
337 let azure = CloudStorageConfig::Azure {
338 container: "my-container".to_string(),
339 account: "myaccount".to_string(),
340 access_key: None,
341 sas_token: None,
342 };
343 assert_eq!(azure.bucket_name(), "my-container");
344 }
345
346 #[test]
347 fn test_cloud_storage_config_to_url() {
348 let s3 = CloudStorageConfig::S3 {
349 bucket: "bucket".to_string(),
350 region: None,
351 endpoint: None,
352 access_key_id: None,
353 secret_access_key: None,
354 session_token: None,
355 virtual_hosted_style: false,
356 };
357 assert_eq!(s3.to_url(), "s3://bucket");
358
359 let gcs = CloudStorageConfig::Gcs {
360 bucket: "bucket".to_string(),
361 service_account_path: None,
362 service_account_key: None,
363 };
364 assert_eq!(gcs.to_url(), "gs://bucket");
365
366 let azure = CloudStorageConfig::Azure {
367 container: "container".to_string(),
368 account: "account".to_string(),
369 access_key: None,
370 sas_token: None,
371 };
372 assert_eq!(azure.to_url(), "az://account/container");
373 }
374
375 #[tokio::test]
376 async fn test_build_store_from_local_url() {
377 let temp_dir = tempfile::tempdir().unwrap();
378 let path = temp_dir.path().to_str().unwrap();
379
380 let result = build_store_from_url(path);
381 assert!(result.is_ok());
382
383 let (store, prefix) = result.unwrap();
384 assert!(prefix.as_ref().is_empty());
385
386 let test_path = Path::from("test.txt");
388 store
389 .put(&test_path, bytes::Bytes::from("hello").into())
390 .await
391 .unwrap();
392
393 let data = store.get(&test_path).await.unwrap().bytes().await.unwrap();
394 assert_eq!(data.as_ref(), b"hello");
395 }
396}