alien_bindings/providers/storage/
gcp_gcs.rs1use crate::providers::storage::credential_bridge::GcpCredentialBridge;
2use crate::providers::utils::{prefixed_path, relativize_path};
3use crate::{
4 error::{Error, ErrorData},
5 presigned::{PresignedOperation, PresignedRequest, PresignedRequestBackend},
6 traits::{Binding, Storage},
7};
8use alien_error::{AlienError, Context, IntoAlienError};
9use async_trait::async_trait;
10use bytes::Bytes;
11use chrono::Utc;
12use futures::stream::BoxStream;
13use futures::TryStreamExt as _;
14use object_store::signer::Signer;
15use object_store::{
16 gcp::GoogleCloudStorage, path::Path, GetOptions, GetResult, ListResult, ObjectMeta,
17 ObjectStore, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result as ObjectStoreResult,
18};
19use reqwest::Method;
20use std::collections::HashMap;
21use std::sync::Arc;
22use std::time::Duration;
23use url::Url;
24
25#[derive(Debug)]
27pub struct GcsStorage {
28 url: Url,
29 base_dir: Path,
30 inner: GoogleCloudStorage,
31}
32
33impl GcsStorage {
34 pub fn new(
38 bucket_name: String,
39 gcp_config: &alien_core::GcpClientConfig,
40 ) -> Result<Self, Error> {
41 let gcs_url = format!("gs://{}", bucket_name);
42 let url = Url::parse(&gcs_url).into_alien_error().context(
43 ErrorData::InvalidConfigurationUrl {
44 url: gcs_url.clone(),
45 reason: "Invalid GCS URL format".to_string(),
46 },
47 )?;
48
49 let credentials = GcpCredentialBridge::new(gcp_config.clone());
53 let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new()
54 .with_bucket_name(&bucket_name)
55 .with_credentials(Arc::new(credentials));
56
57 if let alien_core::GcpCredentials::ServiceAccountKey { json } = &gcp_config.credentials {
58 builder = builder.with_service_account_key(json);
59 }
60
61 let store = builder
62 .build()
63 .into_alien_error()
64 .context(ErrorData::BindingSetupFailed {
65 binding_type: "GCP GCS storage".to_string(),
66 reason: format!("Failed to build GCS client for bucket: {}", bucket_name),
67 })?;
68
69 let base_dir = match url.path_segments() {
71 Some(segments) => Path::from_iter(segments.filter(|s| !s.is_empty())),
72 None => Path::default(), };
74
75 Ok(Self {
76 url,
77 base_dir,
78 inner: store,
79 })
80 }
81}
82
83impl Binding for GcsStorage {}
84
85#[async_trait]
86impl Storage for GcsStorage {
87 fn get_base_dir(&self) -> Path {
88 self.base_dir.clone()
89 }
90
91 fn get_url(&self) -> Url {
92 self.url.clone()
93 }
94
95 async fn presigned_put(
96 &self,
97 path: &Path,
98 expires_in: Duration,
99 ) -> crate::error::Result<PresignedRequest> {
100 let dst = prefixed_path(&self.base_dir, path);
104 let signed_url = self
105 .inner
106 .signed_url(Method::PUT, &dst, expires_in)
107 .await
108 .into_alien_error()
109 .context(ErrorData::StorageOperationFailed {
110 binding_name: "gcp-gcs".to_string(),
111 operation: format!("generate presigned PUT URL for {}", path),
112 })?;
113
114 let headers = HashMap::new();
115
116 Ok(PresignedRequest {
117 backend: PresignedRequestBackend::Http {
118 url: signed_url.to_string(),
119 method: "PUT".to_string(),
120 headers,
121 },
122 expiration: Utc::now()
123 + chrono::Duration::from_std(expires_in).map_err(|e| {
124 AlienError::new(ErrorData::Other {
125 message: format!("Invalid duration: {}", e),
126 })
127 })?,
128 operation: PresignedOperation::Put,
129 path: path.to_string(),
130 })
131 }
132
133 async fn presigned_get(
134 &self,
135 path: &Path,
136 expires_in: Duration,
137 ) -> crate::error::Result<PresignedRequest> {
138 let dst = prefixed_path(&self.base_dir, path);
139 let signed_url = self
140 .inner
141 .signed_url(Method::GET, &dst, expires_in)
142 .await
143 .into_alien_error()
144 .context(ErrorData::StorageOperationFailed {
145 binding_name: "gcp-gcs".to_string(),
146 operation: format!("generate presigned GET URL for {}", path),
147 })?;
148
149 let headers = HashMap::new();
150
151 Ok(PresignedRequest {
152 backend: PresignedRequestBackend::Http {
153 url: signed_url.to_string(),
154 method: "GET".to_string(),
155 headers,
156 },
157 expiration: Utc::now()
158 + chrono::Duration::from_std(expires_in).map_err(|e| {
159 AlienError::new(ErrorData::Other {
160 message: format!("Invalid duration: {}", e),
161 })
162 })?,
163 operation: PresignedOperation::Get,
164 path: path.to_string(),
165 })
166 }
167
168 async fn presigned_delete(
169 &self,
170 path: &Path,
171 expires_in: Duration,
172 ) -> crate::error::Result<PresignedRequest> {
173 let dst = prefixed_path(&self.base_dir, path);
174 let signed_url = self
175 .inner
176 .signed_url(Method::DELETE, &dst, expires_in)
177 .await
178 .into_alien_error()
179 .context(ErrorData::StorageOperationFailed {
180 binding_name: "gcp-gcs".to_string(),
181 operation: format!("generate presigned DELETE URL for {}", path),
182 })?;
183
184 let headers = HashMap::new();
185
186 Ok(PresignedRequest {
187 backend: PresignedRequestBackend::Http {
188 url: signed_url.to_string(),
189 method: "DELETE".to_string(),
190 headers,
191 },
192 expiration: Utc::now()
193 + chrono::Duration::from_std(expires_in).map_err(|e| {
194 AlienError::new(ErrorData::Other {
195 message: format!("Invalid duration: {}", e),
196 })
197 })?,
198 operation: PresignedOperation::Delete,
199 path: path.to_string(),
200 })
201 }
202}
203
204#[async_trait]
207impl ObjectStore for GcsStorage {
208 async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
209 let dst = prefixed_path(&self.base_dir, location);
210 self.inner.put(&dst, payload).await
211 }
212
213 async fn put_opts(
214 &self,
215 location: &Path,
216 payload: PutPayload,
217 opts: PutOptions,
218 ) -> ObjectStoreResult<PutResult> {
219 let dst = prefixed_path(&self.base_dir, location);
220 self.inner.put_opts(&dst, payload, opts).await
221 }
222
223 async fn put_multipart(
224 &self,
225 location: &Path,
226 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
227 let dst = prefixed_path(&self.base_dir, location);
228 self.inner.put_multipart(&dst).await
229 }
230
231 async fn put_multipart_opts(
232 &self,
233 location: &Path,
234 opts: PutMultipartOpts,
235 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
236 let dst = prefixed_path(&self.base_dir, location);
237 self.inner.put_multipart_opts(&dst, opts).await
238 }
239
240 async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
241 let src = prefixed_path(&self.base_dir, location);
242 self.inner.get(&src).await
243 }
244
245 async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
246 let src = prefixed_path(&self.base_dir, location);
247 self.inner.get_opts(&src, options).await
248 }
249
250 async fn get_range(
251 &self,
252 location: &Path,
253 range: std::ops::Range<u64>,
254 ) -> ObjectStoreResult<Bytes> {
255 let src = prefixed_path(&self.base_dir, location);
256 self.inner.get_range(&src, range).await
257 }
258
259 async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
260 let src = prefixed_path(&self.base_dir, location);
261 let mut meta = self.inner.head(&src).await?;
262 meta.location = relativize_path(&self.base_dir, meta.location, "GcpStorage")?;
263 Ok(meta)
264 }
265
266 async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
267 let src = prefixed_path(&self.base_dir, location);
268 self.inner.delete(&src).await
269 }
270
271 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
272 let list_prefix_for_inner = prefix
273 .map(|p| prefixed_path(&self.base_dir, p))
274 .unwrap_or_else(|| self.base_dir.clone());
275
276 let base_dir_for_stream = self.base_dir.clone();
277
278 Box::pin(
279 self.inner
280 .list(Some(&list_prefix_for_inner))
281 .and_then(move |mut meta| {
282 let captured_base_dir = base_dir_for_stream.clone();
283 async move {
284 meta.location =
285 relativize_path(&captured_base_dir, meta.location, "GcpStorage")?;
286 Ok(meta)
287 }
288 }),
289 )
290 }
291
292 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
293 let list_prefix_for_inner = prefix
294 .map(|p| prefixed_path(&self.base_dir, p))
295 .unwrap_or_else(|| self.base_dir.clone());
296 let mut result = self
297 .inner
298 .list_with_delimiter(Some(&list_prefix_for_inner))
299 .await?;
300
301 for meta_obj in &mut result.objects {
302 let original_location = std::mem::take(&mut meta_obj.location);
303 meta_obj.location = relativize_path(&self.base_dir, original_location, "GcpStorage")?;
304 }
305
306 let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
307 for cp in result.common_prefixes {
308 new_common_prefixes.push(relativize_path(&self.base_dir, cp, "GcpStorage")?);
309 }
310 result.common_prefixes = new_common_prefixes;
311
312 Ok(result)
313 }
314
315 async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
316 let src = prefixed_path(&self.base_dir, from);
317 let dst = prefixed_path(&self.base_dir, to);
318 self.inner.copy(&src, &dst).await
319 }
320
321 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
322 let src = prefixed_path(&self.base_dir, from);
323 let dst = prefixed_path(&self.base_dir, to);
324 self.inner.copy_if_not_exists(&src, &dst).await
325 }
326}
327
328impl std::fmt::Display for GcsStorage {
329 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
330 write!(f, "GcpStorage(url={})", self.url)
331 }
332}