Skip to main content

alien_bindings/providers/storage/
gcp_gcs.rs

1use crate::providers::storage::credential_bridge::GcpCredentialBridge;
2use crate::providers::utils::{prefixed_path, relativize_path};
3use crate::{
4    error::{Error, ErrorData},
5    presigned::{PresignedOperation, PresignedRequest, PresignedRequestBackend},
6    traits::{Binding, Storage},
7};
8use alien_error::{AlienError, Context, IntoAlienError};
9use async_trait::async_trait;
10use bytes::Bytes;
11use chrono::Utc;
12use futures::stream::BoxStream;
13use futures::TryStreamExt as _;
14use object_store::signer::Signer;
15use object_store::{
16    gcp::GoogleCloudStorage, path::Path, GetOptions, GetResult, ListResult, ObjectMeta,
17    ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult,
18    Result as ObjectStoreResult,
19};
20use reqwest::Method;
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::Duration;
24use url::Url;
25
26/// Google Cloud Storage implementation.
27#[derive(Debug)]
28pub struct GcsStorage {
29    url: Url,
30    base_dir: Path,
31    inner: GoogleCloudStorage,
32}
33
34impl GcsStorage {
35    /// Creates a new `GcsStorage` instance from bucket configuration.
36    ///
37    /// Uses GCP config for credentials.
38    pub fn new(
39        bucket_name: String,
40        gcp_config: &alien_core::GcpClientConfig,
41    ) -> Result<Self, Error> {
42        let gcs_url = format!("gs://{}", bucket_name);
43        let url = Url::parse(&gcs_url).into_alien_error().context(
44            ErrorData::InvalidConfigurationUrl {
45                url: gcs_url.clone(),
46                reason: "Invalid GCS URL format".to_string(),
47            },
48        )?;
49
50        // Build the store with credentials bridged from GcpClientConfig.
51        // For presigned URLs, object_store needs signing credentials (private key), so
52        // provide the service account key directly when that credential mode is used.
53        let credentials = GcpCredentialBridge::new(gcp_config.clone());
54        let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new()
55            .with_bucket_name(&bucket_name)
56            .with_credentials(Arc::new(credentials));
57
58        if let alien_core::GcpCredentials::ServiceAccountKey { json } = &gcp_config.credentials {
59            builder = builder.with_service_account_key(json);
60        }
61
62        let store = builder
63            .build()
64            .into_alien_error()
65            .context(ErrorData::BindingSetupFailed {
66                binding_type: "GCP GCS storage".to_string(),
67                reason: format!("Failed to build GCS client for bucket: {}", bucket_name),
68            })?;
69
70        // Extract the base path from the URL path segments, handling the None case.
71        let base_dir = match url.path_segments() {
72            Some(segments) => Path::from_iter(segments.filter(|s| !s.is_empty())),
73            None => Path::default(), // Use an empty path if there are no segments
74        };
75
76        Ok(Self {
77            url,
78            base_dir,
79            inner: store,
80        })
81    }
82}
83
84impl Binding for GcsStorage {}
85
86#[async_trait]
87impl Storage for GcsStorage {
88    fn get_base_dir(&self) -> Path {
89        self.base_dir.clone()
90    }
91
92    fn get_url(&self) -> Url {
93        self.url.clone()
94    }
95
96    async fn presigned_put(
97        &self,
98        path: &Path,
99        expires_in: Duration,
100    ) -> crate::error::Result<PresignedRequest> {
101        // Note: Presigned URLs require signing credentials (private key).
102        // This works with ServiceAccountKey but may fail with AccessToken,
103        // ServiceMetadata, or ProjectedServiceAccount credentials.
104        let dst = prefixed_path(&self.base_dir, path);
105        let signed_url = self
106            .inner
107            .signed_url(Method::PUT, &dst, expires_in)
108            .await
109            .into_alien_error()
110            .context(ErrorData::StorageOperationFailed {
111                binding_name: "gcp-gcs".to_string(),
112                operation: format!("generate presigned PUT URL for {}", path),
113            })?;
114
115        let headers = HashMap::new();
116
117        Ok(PresignedRequest {
118            backend: PresignedRequestBackend::Http {
119                url: signed_url.to_string(),
120                method: "PUT".to_string(),
121                headers,
122            },
123            expiration: Utc::now()
124                + chrono::Duration::from_std(expires_in).map_err(|e| {
125                    AlienError::new(ErrorData::Other {
126                        message: format!("Invalid duration: {}", e),
127                    })
128                })?,
129            operation: PresignedOperation::Put,
130            path: path.to_string(),
131        })
132    }
133
134    async fn presigned_get(
135        &self,
136        path: &Path,
137        expires_in: Duration,
138    ) -> crate::error::Result<PresignedRequest> {
139        let dst = prefixed_path(&self.base_dir, path);
140        let signed_url = self
141            .inner
142            .signed_url(Method::GET, &dst, expires_in)
143            .await
144            .into_alien_error()
145            .context(ErrorData::StorageOperationFailed {
146                binding_name: "gcp-gcs".to_string(),
147                operation: format!("generate presigned GET URL for {}", path),
148            })?;
149
150        let headers = HashMap::new();
151
152        Ok(PresignedRequest {
153            backend: PresignedRequestBackend::Http {
154                url: signed_url.to_string(),
155                method: "GET".to_string(),
156                headers,
157            },
158            expiration: Utc::now()
159                + chrono::Duration::from_std(expires_in).map_err(|e| {
160                    AlienError::new(ErrorData::Other {
161                        message: format!("Invalid duration: {}", e),
162                    })
163                })?,
164            operation: PresignedOperation::Get,
165            path: path.to_string(),
166        })
167    }
168
169    async fn presigned_delete(
170        &self,
171        path: &Path,
172        expires_in: Duration,
173    ) -> crate::error::Result<PresignedRequest> {
174        let dst = prefixed_path(&self.base_dir, path);
175        let signed_url = self
176            .inner
177            .signed_url(Method::DELETE, &dst, expires_in)
178            .await
179            .into_alien_error()
180            .context(ErrorData::StorageOperationFailed {
181                binding_name: "gcp-gcs".to_string(),
182                operation: format!("generate presigned DELETE URL for {}", path),
183            })?;
184
185        let headers = HashMap::new();
186
187        Ok(PresignedRequest {
188            backend: PresignedRequestBackend::Http {
189                url: signed_url.to_string(),
190                method: "DELETE".to_string(),
191                headers,
192            },
193            expiration: Utc::now()
194                + chrono::Duration::from_std(expires_in).map_err(|e| {
195                    AlienError::new(ErrorData::Other {
196                        message: format!("Invalid duration: {}", e),
197                    })
198                })?,
199            operation: PresignedOperation::Delete,
200            path: path.to_string(),
201        })
202    }
203}
204
205// Delegate ObjectStore trait implementation to the inner store,
206// prefixing paths with the base_dir.
207#[async_trait]
208impl ObjectStore for GcsStorage {
209    async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
210        let dst = prefixed_path(&self.base_dir, location);
211        self.inner.put(&dst, payload).await
212    }
213
214    async fn put_opts(
215        &self,
216        location: &Path,
217        payload: PutPayload,
218        opts: PutOptions,
219    ) -> ObjectStoreResult<PutResult> {
220        let dst = prefixed_path(&self.base_dir, location);
221        self.inner.put_opts(&dst, payload, opts).await
222    }
223
224    async fn put_multipart(
225        &self,
226        location: &Path,
227    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
228        let dst = prefixed_path(&self.base_dir, location);
229        self.inner.put_multipart(&dst).await
230    }
231
232    async fn put_multipart_opts(
233        &self,
234        location: &Path,
235        opts: PutMultipartOptions,
236    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
237        let dst = prefixed_path(&self.base_dir, location);
238        self.inner.put_multipart_opts(&dst, opts).await
239    }
240
241    async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
242        let src = prefixed_path(&self.base_dir, location);
243        self.inner.get(&src).await
244    }
245
246    async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
247        let src = prefixed_path(&self.base_dir, location);
248        self.inner.get_opts(&src, options).await
249    }
250
251    async fn get_range(
252        &self,
253        location: &Path,
254        range: std::ops::Range<u64>,
255    ) -> ObjectStoreResult<Bytes> {
256        let src = prefixed_path(&self.base_dir, location);
257        self.inner.get_range(&src, range).await
258    }
259
260    async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
261        let src = prefixed_path(&self.base_dir, location);
262        let mut meta = self.inner.head(&src).await?;
263        meta.location = relativize_path(&self.base_dir, meta.location, "GcpStorage")?;
264        Ok(meta)
265    }
266
267    async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
268        let src = prefixed_path(&self.base_dir, location);
269        self.inner.delete(&src).await
270    }
271
272    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
273        let list_prefix_for_inner = prefix
274            .map(|p| prefixed_path(&self.base_dir, p))
275            .unwrap_or_else(|| self.base_dir.clone());
276
277        let base_dir_for_stream = self.base_dir.clone();
278
279        Box::pin(
280            self.inner
281                .list(Some(&list_prefix_for_inner))
282                .and_then(move |mut meta| {
283                    let captured_base_dir = base_dir_for_stream.clone();
284                    async move {
285                        meta.location =
286                            relativize_path(&captured_base_dir, meta.location, "GcpStorage")?;
287                        Ok(meta)
288                    }
289                }),
290        )
291    }
292
293    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
294        let list_prefix_for_inner = prefix
295            .map(|p| prefixed_path(&self.base_dir, p))
296            .unwrap_or_else(|| self.base_dir.clone());
297        let mut result = self
298            .inner
299            .list_with_delimiter(Some(&list_prefix_for_inner))
300            .await?;
301
302        for meta_obj in &mut result.objects {
303            let original_location = std::mem::take(&mut meta_obj.location);
304            meta_obj.location = relativize_path(&self.base_dir, original_location, "GcpStorage")?;
305        }
306
307        let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
308        for cp in result.common_prefixes {
309            new_common_prefixes.push(relativize_path(&self.base_dir, cp, "GcpStorage")?);
310        }
311        result.common_prefixes = new_common_prefixes;
312
313        Ok(result)
314    }
315
316    async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
317        let src = prefixed_path(&self.base_dir, from);
318        let dst = prefixed_path(&self.base_dir, to);
319        self.inner.copy(&src, &dst).await
320    }
321
322    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
323        let src = prefixed_path(&self.base_dir, from);
324        let dst = prefixed_path(&self.base_dir, to);
325        self.inner.copy_if_not_exists(&src, &dst).await
326    }
327}
328
329impl std::fmt::Display for GcsStorage {
330    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331        write!(f, "GcpStorage(url={})", self.url)
332    }
333}