Skip to main content

alien_bindings/providers/storage/
gcp_gcs.rs

1use crate::providers::storage::credential_bridge::GcpCredentialBridge;
2use crate::providers::utils::{prefixed_path, relativize_path};
3use crate::{
4    error::{Error, ErrorData},
5    presigned::{PresignedOperation, PresignedRequest, PresignedRequestBackend},
6    traits::{Binding, Storage},
7};
8use alien_error::{AlienError, Context, IntoAlienError};
9use async_trait::async_trait;
10use bytes::Bytes;
11use chrono::Utc;
12use futures::stream::BoxStream;
13use futures::TryStreamExt as _;
14use object_store::signer::Signer;
15use object_store::{
16    gcp::GoogleCloudStorage, path::Path, GetOptions, GetResult, ListResult, ObjectMeta,
17    ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
18};
19use reqwest::Method;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use url::Url;
24
25/// Google Cloud Storage implementation.
26#[derive(Debug)]
27pub struct GcsStorage {
28    url: Url,
29    base_dir: Path,
30    inner: GoogleCloudStorage,
31}
32
33impl GcsStorage {
34    /// Creates a new `GcsStorage` instance from bucket configuration.
35    ///
36    /// Uses GCP config for credentials.
37    pub fn new(
38        bucket_name: String,
39        gcp_config: &alien_core::GcpClientConfig,
40    ) -> Result<Self, Error> {
41        let gcs_url = format!("gs://{}", bucket_name);
42        let url = Url::parse(&gcs_url).into_alien_error().context(
43            ErrorData::InvalidConfigurationUrl {
44                url: gcs_url.clone(),
45                reason: "Invalid GCS URL format".to_string(),
46            },
47        )?;
48
49        // Build the store with credentials bridged from GcpClientConfig.
50        // For presigned URLs, object_store needs signing credentials (private key), so
51        // provide the service account key directly when that credential mode is used.
52        let credentials = GcpCredentialBridge::new(gcp_config.clone());
53        let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new()
54            .with_bucket_name(&bucket_name)
55            .with_credentials(Arc::new(credentials));
56
57        if let alien_core::GcpCredentials::ServiceAccountKey { json } = &gcp_config.credentials {
58            builder = builder.with_service_account_key(json);
59        }
60
61        let store = builder
62            .build()
63            .into_alien_error()
64            .context(ErrorData::BindingSetupFailed {
65                binding_type: "GCP GCS storage".to_string(),
66                reason: format!("Failed to build GCS client for bucket: {}", bucket_name),
67            })?;
68
69        // Extract the base path from the URL path segments, handling the None case.
70        let base_dir = match url.path_segments() {
71            Some(segments) => Path::from_iter(segments.filter(|s| !s.is_empty())),
72            None => Path::default(), // Use an empty path if there are no segments
73        };
74
75        Ok(Self {
76            url,
77            base_dir,
78            inner: store,
79        })
80    }
81}
82
83impl Binding for GcsStorage {}
84
85#[async_trait]
86impl Storage for GcsStorage {
87    fn get_base_dir(&self) -> Path {
88        self.base_dir.clone()
89    }
90
91    fn get_url(&self) -> Url {
92        self.url.clone()
93    }
94
95    async fn presigned_put(
96        &self,
97        path: &Path,
98        expires_in: Duration,
99    ) -> crate::error::Result<PresignedRequest> {
100        // Note: Presigned URLs require signing credentials (private key).
101        // This works with ServiceAccountKey but may fail with AccessToken,
102        // ServiceMetadata, or ProjectedServiceAccount credentials.
103        let dst = prefixed_path(&self.base_dir, path);
104        let signed_url = self
105            .inner
106            .signed_url(Method::PUT, &dst, expires_in)
107            .await
108            .into_alien_error()
109            .context(ErrorData::StorageOperationFailed {
110                binding_name: "gcp-gcs".to_string(),
111                operation: format!("generate presigned PUT URL for {}", path),
112            })?;
113
114        let headers = HashMap::new();
115
116        Ok(PresignedRequest {
117            backend: PresignedRequestBackend::Http {
118                url: signed_url.to_string(),
119                method: "PUT".to_string(),
120                headers,
121            },
122            expiration: Utc::now()
123                + chrono::Duration::from_std(expires_in).map_err(|e| {
124                    AlienError::new(ErrorData::Other {
125                        message: format!("Invalid duration: {}", e),
126                    })
127                })?,
128            operation: PresignedOperation::Put,
129            path: path.to_string(),
130        })
131    }
132
133    async fn presigned_get(
134        &self,
135        path: &Path,
136        expires_in: Duration,
137    ) -> crate::error::Result<PresignedRequest> {
138        let dst = prefixed_path(&self.base_dir, path);
139        let signed_url = self
140            .inner
141            .signed_url(Method::GET, &dst, expires_in)
142            .await
143            .into_alien_error()
144            .context(ErrorData::StorageOperationFailed {
145                binding_name: "gcp-gcs".to_string(),
146                operation: format!("generate presigned GET URL for {}", path),
147            })?;
148
149        let headers = HashMap::new();
150
151        Ok(PresignedRequest {
152            backend: PresignedRequestBackend::Http {
153                url: signed_url.to_string(),
154                method: "GET".to_string(),
155                headers,
156            },
157            expiration: Utc::now()
158                + chrono::Duration::from_std(expires_in).map_err(|e| {
159                    AlienError::new(ErrorData::Other {
160                        message: format!("Invalid duration: {}", e),
161                    })
162                })?,
163            operation: PresignedOperation::Get,
164            path: path.to_string(),
165        })
166    }
167
168    async fn presigned_delete(
169        &self,
170        path: &Path,
171        expires_in: Duration,
172    ) -> crate::error::Result<PresignedRequest> {
173        let dst = prefixed_path(&self.base_dir, path);
174        let signed_url = self
175            .inner
176            .signed_url(Method::DELETE, &dst, expires_in)
177            .await
178            .into_alien_error()
179            .context(ErrorData::StorageOperationFailed {
180                binding_name: "gcp-gcs".to_string(),
181                operation: format!("generate presigned DELETE URL for {}", path),
182            })?;
183
184        let headers = HashMap::new();
185
186        Ok(PresignedRequest {
187            backend: PresignedRequestBackend::Http {
188                url: signed_url.to_string(),
189                method: "DELETE".to_string(),
190                headers,
191            },
192            expiration: Utc::now()
193                + chrono::Duration::from_std(expires_in).map_err(|e| {
194                    AlienError::new(ErrorData::Other {
195                        message: format!("Invalid duration: {}", e),
196                    })
197                })?,
198            operation: PresignedOperation::Delete,
199            path: path.to_string(),
200        })
201    }
202}
203
204// Delegate ObjectStore trait implementation to the inner store,
205// prefixing paths with the base_dir.
206#[async_trait]
207impl ObjectStore for GcsStorage {
208    async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
209        let dst = prefixed_path(&self.base_dir, location);
210        self.inner.put(&dst, payload).await
211    }
212
213    async fn put_opts(
214        &self,
215        location: &Path,
216        payload: PutPayload,
217        opts: PutOptions,
218    ) -> ObjectStoreResult<PutResult> {
219        let dst = prefixed_path(&self.base_dir, location);
220        self.inner.put_opts(&dst, payload, opts).await
221    }
222
223    async fn put_multipart(
224        &self,
225        location: &Path,
226    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
227        let dst = prefixed_path(&self.base_dir, location);
228        self.inner.put_multipart(&dst).await
229    }
230
231    async fn put_multipart_opts(
232        &self,
233        location: &Path,
234        opts: PutMultipartOpts,
235    ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
236        let dst = prefixed_path(&self.base_dir, location);
237        self.inner.put_multipart_opts(&dst, opts).await
238    }
239
240    async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
241        let src = prefixed_path(&self.base_dir, location);
242        self.inner.get(&src).await
243    }
244
245    async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
246        let src = prefixed_path(&self.base_dir, location);
247        self.inner.get_opts(&src, options).await
248    }
249
250    async fn get_range(
251        &self,
252        location: &Path,
253        range: std::ops::Range<u64>,
254    ) -> ObjectStoreResult<Bytes> {
255        let src = prefixed_path(&self.base_dir, location);
256        self.inner.get_range(&src, range).await
257    }
258
259    async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
260        let src = prefixed_path(&self.base_dir, location);
261        let mut meta = self.inner.head(&src).await?;
262        meta.location = relativize_path(&self.base_dir, meta.location, "GcpStorage")?;
263        Ok(meta)
264    }
265
266    async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
267        let src = prefixed_path(&self.base_dir, location);
268        self.inner.delete(&src).await
269    }
270
271    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
272        let list_prefix_for_inner = prefix
273            .map(|p| prefixed_path(&self.base_dir, p))
274            .unwrap_or_else(|| self.base_dir.clone());
275
276        let base_dir_for_stream = self.base_dir.clone();
277
278        Box::pin(
279            self.inner
280                .list(Some(&list_prefix_for_inner))
281                .and_then(move |mut meta| {
282                    let captured_base_dir = base_dir_for_stream.clone();
283                    async move {
284                        meta.location =
285                            relativize_path(&captured_base_dir, meta.location, "GcpStorage")?;
286                        Ok(meta)
287                    }
288                }),
289        )
290    }
291
292    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
293        let list_prefix_for_inner = prefix
294            .map(|p| prefixed_path(&self.base_dir, p))
295            .unwrap_or_else(|| self.base_dir.clone());
296        let mut result = self
297            .inner
298            .list_with_delimiter(Some(&list_prefix_for_inner))
299            .await?;
300
301        for meta_obj in &mut result.objects {
302            let original_location = std::mem::take(&mut meta_obj.location);
303            meta_obj.location = relativize_path(&self.base_dir, original_location, "GcpStorage")?;
304        }
305
306        let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
307        for cp in result.common_prefixes {
308            new_common_prefixes.push(relativize_path(&self.base_dir, cp, "GcpStorage")?);
309        }
310        result.common_prefixes = new_common_prefixes;
311
312        Ok(result)
313    }
314
315    async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
316        let src = prefixed_path(&self.base_dir, from);
317        let dst = prefixed_path(&self.base_dir, to);
318        self.inner.copy(&src, &dst).await
319    }
320
321    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
322        let src = prefixed_path(&self.base_dir, from);
323        let dst = prefixed_path(&self.base_dir, to);
324        self.inner.copy_if_not_exists(&src, &dst).await
325    }
326}
327
328impl std::fmt::Display for GcsStorage {
329    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330        write!(f, "GcpStorage(url={})", self.url)
331    }
332}