alien_bindings/providers/storage/
gcp_gcs.rs1use crate::providers::storage::credential_bridge::GcpCredentialBridge;
2use crate::providers::utils::{prefixed_path, relativize_path};
3use crate::{
4 error::{Error, ErrorData},
5 presigned::{PresignedOperation, PresignedRequest, PresignedRequestBackend},
6 traits::{Binding, Storage},
7};
8use alien_error::{AlienError, Context, IntoAlienError};
9use async_trait::async_trait;
10use bytes::Bytes;
11use chrono::Utc;
12use futures::stream::BoxStream;
13use futures::TryStreamExt as _;
14use object_store::signer::Signer;
15use object_store::{
16 gcp::GoogleCloudStorage, path::Path, GetOptions, GetResult, ListResult, ObjectMeta,
17 ObjectStore, PutMultipartOptions, PutOptions, PutPayload, PutResult,
18 Result as ObjectStoreResult,
19};
20use reqwest::Method;
21use std::collections::HashMap;
22use std::sync::Arc;
23use std::time::Duration;
24use url::Url;
25
26#[derive(Debug)]
28pub struct GcsStorage {
29 url: Url,
30 base_dir: Path,
31 inner: GoogleCloudStorage,
32}
33
34impl GcsStorage {
35 pub fn new(
39 bucket_name: String,
40 gcp_config: &alien_core::GcpClientConfig,
41 ) -> Result<Self, Error> {
42 let gcs_url = format!("gs://{}", bucket_name);
43 let url = Url::parse(&gcs_url).into_alien_error().context(
44 ErrorData::InvalidConfigurationUrl {
45 url: gcs_url.clone(),
46 reason: "Invalid GCS URL format".to_string(),
47 },
48 )?;
49
50 let credentials = GcpCredentialBridge::new(gcp_config.clone());
54 let mut builder = object_store::gcp::GoogleCloudStorageBuilder::new()
55 .with_bucket_name(&bucket_name)
56 .with_credentials(Arc::new(credentials));
57
58 if let alien_core::GcpCredentials::ServiceAccountKey { json } = &gcp_config.credentials {
59 builder = builder.with_service_account_key(json);
60 }
61
62 let store = builder
63 .build()
64 .into_alien_error()
65 .context(ErrorData::BindingSetupFailed {
66 binding_type: "GCP GCS storage".to_string(),
67 reason: format!("Failed to build GCS client for bucket: {}", bucket_name),
68 })?;
69
70 let base_dir = match url.path_segments() {
72 Some(segments) => Path::from_iter(segments.filter(|s| !s.is_empty())),
73 None => Path::default(), };
75
76 Ok(Self {
77 url,
78 base_dir,
79 inner: store,
80 })
81 }
82}
83
84impl Binding for GcsStorage {}
85
86#[async_trait]
87impl Storage for GcsStorage {
88 fn get_base_dir(&self) -> Path {
89 self.base_dir.clone()
90 }
91
92 fn get_url(&self) -> Url {
93 self.url.clone()
94 }
95
96 async fn presigned_put(
97 &self,
98 path: &Path,
99 expires_in: Duration,
100 ) -> crate::error::Result<PresignedRequest> {
101 let dst = prefixed_path(&self.base_dir, path);
105 let signed_url = self
106 .inner
107 .signed_url(Method::PUT, &dst, expires_in)
108 .await
109 .into_alien_error()
110 .context(ErrorData::StorageOperationFailed {
111 binding_name: "gcp-gcs".to_string(),
112 operation: format!("generate presigned PUT URL for {}", path),
113 })?;
114
115 let headers = HashMap::new();
116
117 Ok(PresignedRequest {
118 backend: PresignedRequestBackend::Http {
119 url: signed_url.to_string(),
120 method: "PUT".to_string(),
121 headers,
122 },
123 expiration: Utc::now()
124 + chrono::Duration::from_std(expires_in).map_err(|e| {
125 AlienError::new(ErrorData::Other {
126 message: format!("Invalid duration: {}", e),
127 })
128 })?,
129 operation: PresignedOperation::Put,
130 path: path.to_string(),
131 })
132 }
133
134 async fn presigned_get(
135 &self,
136 path: &Path,
137 expires_in: Duration,
138 ) -> crate::error::Result<PresignedRequest> {
139 let dst = prefixed_path(&self.base_dir, path);
140 let signed_url = self
141 .inner
142 .signed_url(Method::GET, &dst, expires_in)
143 .await
144 .into_alien_error()
145 .context(ErrorData::StorageOperationFailed {
146 binding_name: "gcp-gcs".to_string(),
147 operation: format!("generate presigned GET URL for {}", path),
148 })?;
149
150 let headers = HashMap::new();
151
152 Ok(PresignedRequest {
153 backend: PresignedRequestBackend::Http {
154 url: signed_url.to_string(),
155 method: "GET".to_string(),
156 headers,
157 },
158 expiration: Utc::now()
159 + chrono::Duration::from_std(expires_in).map_err(|e| {
160 AlienError::new(ErrorData::Other {
161 message: format!("Invalid duration: {}", e),
162 })
163 })?,
164 operation: PresignedOperation::Get,
165 path: path.to_string(),
166 })
167 }
168
169 async fn presigned_delete(
170 &self,
171 path: &Path,
172 expires_in: Duration,
173 ) -> crate::error::Result<PresignedRequest> {
174 let dst = prefixed_path(&self.base_dir, path);
175 let signed_url = self
176 .inner
177 .signed_url(Method::DELETE, &dst, expires_in)
178 .await
179 .into_alien_error()
180 .context(ErrorData::StorageOperationFailed {
181 binding_name: "gcp-gcs".to_string(),
182 operation: format!("generate presigned DELETE URL for {}", path),
183 })?;
184
185 let headers = HashMap::new();
186
187 Ok(PresignedRequest {
188 backend: PresignedRequestBackend::Http {
189 url: signed_url.to_string(),
190 method: "DELETE".to_string(),
191 headers,
192 },
193 expiration: Utc::now()
194 + chrono::Duration::from_std(expires_in).map_err(|e| {
195 AlienError::new(ErrorData::Other {
196 message: format!("Invalid duration: {}", e),
197 })
198 })?,
199 operation: PresignedOperation::Delete,
200 path: path.to_string(),
201 })
202 }
203}
204
205#[async_trait]
208impl ObjectStore for GcsStorage {
209 async fn put(&self, location: &Path, payload: PutPayload) -> ObjectStoreResult<PutResult> {
210 let dst = prefixed_path(&self.base_dir, location);
211 self.inner.put(&dst, payload).await
212 }
213
214 async fn put_opts(
215 &self,
216 location: &Path,
217 payload: PutPayload,
218 opts: PutOptions,
219 ) -> ObjectStoreResult<PutResult> {
220 let dst = prefixed_path(&self.base_dir, location);
221 self.inner.put_opts(&dst, payload, opts).await
222 }
223
224 async fn put_multipart(
225 &self,
226 location: &Path,
227 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
228 let dst = prefixed_path(&self.base_dir, location);
229 self.inner.put_multipart(&dst).await
230 }
231
232 async fn put_multipart_opts(
233 &self,
234 location: &Path,
235 opts: PutMultipartOptions,
236 ) -> ObjectStoreResult<Box<dyn object_store::MultipartUpload>> {
237 let dst = prefixed_path(&self.base_dir, location);
238 self.inner.put_multipart_opts(&dst, opts).await
239 }
240
241 async fn get(&self, location: &Path) -> ObjectStoreResult<GetResult> {
242 let src = prefixed_path(&self.base_dir, location);
243 self.inner.get(&src).await
244 }
245
246 async fn get_opts(&self, location: &Path, options: GetOptions) -> ObjectStoreResult<GetResult> {
247 let src = prefixed_path(&self.base_dir, location);
248 self.inner.get_opts(&src, options).await
249 }
250
251 async fn get_range(
252 &self,
253 location: &Path,
254 range: std::ops::Range<u64>,
255 ) -> ObjectStoreResult<Bytes> {
256 let src = prefixed_path(&self.base_dir, location);
257 self.inner.get_range(&src, range).await
258 }
259
260 async fn head(&self, location: &Path) -> ObjectStoreResult<ObjectMeta> {
261 let src = prefixed_path(&self.base_dir, location);
262 let mut meta = self.inner.head(&src).await?;
263 meta.location = relativize_path(&self.base_dir, meta.location, "GcpStorage")?;
264 Ok(meta)
265 }
266
267 async fn delete(&self, location: &Path) -> ObjectStoreResult<()> {
268 let src = prefixed_path(&self.base_dir, location);
269 self.inner.delete(&src).await
270 }
271
272 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, ObjectStoreResult<ObjectMeta>> {
273 let list_prefix_for_inner = prefix
274 .map(|p| prefixed_path(&self.base_dir, p))
275 .unwrap_or_else(|| self.base_dir.clone());
276
277 let base_dir_for_stream = self.base_dir.clone();
278
279 Box::pin(
280 self.inner
281 .list(Some(&list_prefix_for_inner))
282 .and_then(move |mut meta| {
283 let captured_base_dir = base_dir_for_stream.clone();
284 async move {
285 meta.location =
286 relativize_path(&captured_base_dir, meta.location, "GcpStorage")?;
287 Ok(meta)
288 }
289 }),
290 )
291 }
292
293 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> ObjectStoreResult<ListResult> {
294 let list_prefix_for_inner = prefix
295 .map(|p| prefixed_path(&self.base_dir, p))
296 .unwrap_or_else(|| self.base_dir.clone());
297 let mut result = self
298 .inner
299 .list_with_delimiter(Some(&list_prefix_for_inner))
300 .await?;
301
302 for meta_obj in &mut result.objects {
303 let original_location = std::mem::take(&mut meta_obj.location);
304 meta_obj.location = relativize_path(&self.base_dir, original_location, "GcpStorage")?;
305 }
306
307 let mut new_common_prefixes = Vec::with_capacity(result.common_prefixes.len());
308 for cp in result.common_prefixes {
309 new_common_prefixes.push(relativize_path(&self.base_dir, cp, "GcpStorage")?);
310 }
311 result.common_prefixes = new_common_prefixes;
312
313 Ok(result)
314 }
315
316 async fn copy(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
317 let src = prefixed_path(&self.base_dir, from);
318 let dst = prefixed_path(&self.base_dir, to);
319 self.inner.copy(&src, &dst).await
320 }
321
322 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> ObjectStoreResult<()> {
323 let src = prefixed_path(&self.base_dir, from);
324 let dst = prefixed_path(&self.base_dir, to);
325 self.inner.copy_if_not_exists(&src, &dst).await
326 }
327}
328
329impl std::fmt::Display for GcsStorage {
330 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
331 write!(f, "GcpStorage(url={})", self.url)
332 }
333}