Skip to main content

alien_bindings/providers/storage/
grpc.rs

1use crate::error::ErrorData;
2use crate::grpc::storage_utils;
3use crate::grpc::MAX_GRPC_MESSAGE_SIZE;
4use crate::{
5    grpc::storage_service::alien_bindings::storage::{
6        get_response_part,
7        storage_put_multipart_chunk_request::Part as StoragePutMultipartChunkRequestPart,
8        storage_service_client::StorageServiceClient, StorageCopyRequest, StorageDeleteRequest,
9        StorageGetBaseDirRequest, StorageGetUrlRequest, StorageHeadRequest, StorageHttpMethod,
10        StorageListRequest, StorageListWithDelimiterRequest, StoragePutMultipartChunkRequest,
11        StoragePutMultipartMetadata, StoragePutRequest, StoragePutResponse, StorageRenameRequest,
12        StorageSignedUrlRequest,
13    },
14    presigned::{LocalOperation, PresignedOperation, PresignedRequest, PresignedRequestBackend},
15    traits::Binding,
16};
17
18use alien_error::AlienError;
19use alien_error::Context as _;
20use alien_error::IntoAlienError as _;
21use async_stream::try_stream;
22use async_trait::async_trait;
23use bytes::Bytes;
24use chrono::{self, Utc};
25use futures::{stream::BoxStream, StreamExt};
26use object_store::{
27    path::Path, Attributes as OsAttributes, Error as ObjectStoreError, GetOptions,
28    GetRange as OsGetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
29    PutMultipartOptions, PutOptions, PutPayload, PutResult,
30};
31use prost_types;
32use std::time::Duration;
33use std::{
34    collections::HashMap,
35    fmt::{Debug, Formatter},
36};
37use tokio::sync::mpsc;
38use tokio::task::JoinHandle;
39use tokio_stream::wrappers::ReceiverStream;
40use tonic::{transport::Channel, Request, Status};
41use url::Url;
42
43#[derive(Debug)]
44pub struct GrpcStorage {
45    client: StorageServiceClient<Channel>,
46    binding_name: String,
47    base_dir: Path,
48    base_url: Url,
49}
50
51impl GrpcStorage {
52    pub async fn new(binding_name: String, grpc_address: String) -> crate::error::Result<Self> {
53        let channel = crate::providers::grpc_provider::create_grpc_channel(grpc_address).await?;
54        Self::new_from_channel(channel, binding_name).await
55    }
56
57    pub async fn new_from_channel(
58        channel: Channel,
59        binding_name: String,
60    ) -> crate::error::Result<Self> {
61        tracing::debug!(
62            "GrpcStorage::new_from_channel: Creating client for binding: {}",
63            binding_name
64        );
65        let mut client = StorageServiceClient::new(channel.clone())
66            .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE);
67
68        tracing::debug!(
69            "GrpcStorage::new_from_channel: Calling get_base_dir for binding: {}",
70            binding_name
71        );
72        let base_dir_req = StorageGetBaseDirRequest {
73            binding_name: binding_name.clone(),
74        };
75        let base_dir_resp = client
76            .get_base_dir(Request::new(base_dir_req))
77            .await
78            .into_alien_error()
79            .context(ErrorData::GrpcRequestFailed {
80                service: "storage".to_string(),
81                method: "get_base_dir".to_string(),
82                details: format!("Failed to get base directory for binding {}", binding_name),
83            })?
84            .into_inner();
85        let base_dir = Path::from(base_dir_resp.path);
86        tracing::debug!("GrpcStorage::new_from_channel: Got base_dir: {}", base_dir);
87
88        tracing::debug!(
89            "GrpcStorage::new_from_channel: Calling get_url for binding: {}",
90            binding_name
91        );
92        let get_url_req = StorageGetUrlRequest {
93            binding_name: binding_name.clone(),
94        };
95        let get_url_resp = client
96            .get_url(Request::new(get_url_req))
97            .await
98            .into_alien_error()
99            .context(ErrorData::GrpcRequestFailed {
100                service: "storage".to_string(),
101                method: "get_url".to_string(),
102                details: format!("Failed to get URL for binding {}", binding_name),
103            })?
104            .into_inner();
105        tracing::debug!(
106            "GrpcStorage::new_from_channel: Got url: {}",
107            get_url_resp.url
108        );
109        let base_url = Url::parse(&get_url_resp.url).into_alien_error().context(
110            ErrorData::BindingConfigInvalid {
111                binding_name: binding_name.clone(),
112                reason: format!("Invalid base_url: {}", get_url_resp.url),
113            },
114        )?;
115
116        Ok(Self {
117            client: StorageServiceClient::new(channel) // Can re-use the original channel
118                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE)
119                .max_encoding_message_size(MAX_GRPC_MESSAGE_SIZE),
120            binding_name,
121            base_dir,
122            base_url,
123        })
124    }
125
126    fn client(&self) -> StorageServiceClient<Channel> {
127        self.client.clone()
128    }
129
130    fn http_presigned_backend(
131        url: &str,
132        method: &str,
133        headers: HashMap<String, String>,
134        invalid_url_reason: &str,
135    ) -> crate::error::Result<PresignedRequestBackend> {
136        Url::parse(url)
137            .into_alien_error()
138            .context(ErrorData::InvalidConfigurationUrl {
139                url: url.to_string(),
140                reason: invalid_url_reason.to_string(),
141            })?;
142
143        Ok(PresignedRequestBackend::Http {
144            url: url.to_string(),
145            method: method.to_string(),
146            headers,
147        })
148    }
149}
150
151impl std::fmt::Display for GrpcStorage {
152    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153        write!(
154            f,
155            "GrpcStorage(binding='{}', base_url='{}')",
156            self.binding_name, self.base_url
157        )
158    }
159}
160
161impl Binding for GrpcStorage {}
162
163#[async_trait]
164impl crate::Storage for GrpcStorage {
165    fn get_base_dir(&self) -> Path {
166        self.base_dir.clone()
167    }
168
169    fn get_url(&self) -> Url {
170        self.base_url.clone()
171    }
172
173    async fn presigned_put(
174        &self,
175        path: &Path,
176        expires_in: Duration,
177    ) -> crate::error::Result<PresignedRequest> {
178        let mut client = self.client();
179        let expiration = Utc::now()
180            + chrono::Duration::from_std(expires_in)
181                .into_alien_error()
182                .context(ErrorData::Other {
183                    message: "Invalid duration for presigned PUT request".to_string(),
184                })?;
185
186        let request = StorageSignedUrlRequest {
187            binding_name: self.binding_name.clone(),
188            path: path.to_string(),
189            http_method: StorageHttpMethod::HttpMethodPut as i32,
190            expiration_time: Some(prost_types::Timestamp {
191                seconds: expiration.timestamp(),
192                nanos: expiration.timestamp_subsec_nanos() as i32,
193            }),
194        };
195
196        let response = client
197            .signed_url(tonic::Request::new(request))
198            .await
199            .into_alien_error()
200            .context(ErrorData::GrpcRequestFailed {
201                service: "storage".to_string(),
202                method: "signed_url".to_string(),
203                details: "Failed to generate presigned PUT URL".to_string(),
204            })?
205            .into_inner();
206
207        // Parse the returned URL to determine if it's HTTP or local
208        let headers = response
209            .headers
210            .into_iter()
211            .map(|header| (header.key, header.value))
212            .collect();
213        let url = &response.url;
214        let backend = if url.starts_with("http://") || url.starts_with("https://") {
215            Self::http_presigned_backend(url, "PUT", headers, "Invalid presigned PUT URL format")?
216        } else if url.starts_with("local://") {
217            // Local filesystem URL
218            let file_path = url.strip_prefix("local://").unwrap_or(url);
219            PresignedRequestBackend::Local {
220                file_path: file_path.to_string(),
221                operation: LocalOperation::Put,
222            }
223        } else {
224            return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
225                url: url.to_string(),
226                reason: "Unsupported presigned URL scheme".to_string(),
227            }));
228        };
229
230        Ok(PresignedRequest {
231            backend,
232            expiration,
233            operation: PresignedOperation::Put,
234            path: path.to_string(),
235        })
236    }
237
238    async fn presigned_get(
239        &self,
240        path: &Path,
241        expires_in: Duration,
242    ) -> crate::error::Result<PresignedRequest> {
243        let mut client = self.client();
244        let expiration = Utc::now()
245            + chrono::Duration::from_std(expires_in)
246                .into_alien_error()
247                .context(ErrorData::Other {
248                    message: "Invalid duration for presigned GET request".to_string(),
249                })?;
250
251        let request = StorageSignedUrlRequest {
252            binding_name: self.binding_name.clone(),
253            path: path.to_string(),
254            http_method: StorageHttpMethod::HttpMethodGet as i32,
255            expiration_time: Some(prost_types::Timestamp {
256                seconds: expiration.timestamp(),
257                nanos: expiration.timestamp_subsec_nanos() as i32,
258            }),
259        };
260
261        let response = client
262            .signed_url(tonic::Request::new(request))
263            .await
264            .into_alien_error()
265            .context(ErrorData::GrpcRequestFailed {
266                service: "storage".to_string(),
267                method: "signed_url".to_string(),
268                details: "Failed to generate presigned GET URL".to_string(),
269            })?
270            .into_inner();
271
272        // Parse the returned URL to determine if it's HTTP or local
273        let headers = response
274            .headers
275            .into_iter()
276            .map(|header| (header.key, header.value))
277            .collect();
278        let url = &response.url;
279        let backend = if url.starts_with("http://") || url.starts_with("https://") {
280            Self::http_presigned_backend(url, "GET", headers, "Invalid presigned GET URL format")?
281        } else if url.starts_with("local://") {
282            // Local filesystem URL
283            let file_path = url.strip_prefix("local://").unwrap_or(url);
284            PresignedRequestBackend::Local {
285                file_path: file_path.to_string(),
286                operation: LocalOperation::Get,
287            }
288        } else {
289            return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
290                url: url.to_string(),
291                reason: "Unsupported presigned URL scheme".to_string(),
292            }));
293        };
294
295        Ok(PresignedRequest {
296            backend,
297            expiration,
298            operation: PresignedOperation::Get,
299            path: path.to_string(),
300        })
301    }
302
303    async fn presigned_delete(
304        &self,
305        path: &Path,
306        expires_in: Duration,
307    ) -> crate::error::Result<PresignedRequest> {
308        let mut client = self.client();
309        let expiration = Utc::now()
310            + chrono::Duration::from_std(expires_in)
311                .into_alien_error()
312                .context(ErrorData::Other {
313                    message: "Invalid duration for presigned DELETE request".to_string(),
314                })?;
315
316        let request = StorageSignedUrlRequest {
317            binding_name: self.binding_name.clone(),
318            path: path.to_string(),
319            http_method: StorageHttpMethod::HttpMethodDelete as i32,
320            expiration_time: Some(prost_types::Timestamp {
321                seconds: expiration.timestamp(),
322                nanos: expiration.timestamp_subsec_nanos() as i32,
323            }),
324        };
325
326        let response = client
327            .signed_url(tonic::Request::new(request))
328            .await
329            .into_alien_error()
330            .context(ErrorData::GrpcRequestFailed {
331                service: "storage".to_string(),
332                method: "signed_url".to_string(),
333                details: "Failed to generate presigned DELETE URL".to_string(),
334            })?
335            .into_inner();
336
337        // Parse the returned URL to determine if it's HTTP or local
338        let headers = response
339            .headers
340            .into_iter()
341            .map(|header| (header.key, header.value))
342            .collect();
343        let url = &response.url;
344        let backend = if url.starts_with("http://") || url.starts_with("https://") {
345            Self::http_presigned_backend(
346                url,
347                "DELETE",
348                headers,
349                "Invalid presigned DELETE URL format",
350            )?
351        } else if url.starts_with("local://") {
352            // Local filesystem URL
353            let file_path = url.strip_prefix("local://").unwrap_or(url);
354            PresignedRequestBackend::Local {
355                file_path: file_path.to_string(),
356                operation: LocalOperation::Delete,
357            }
358        } else {
359            return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
360                url: url.to_string(),
361                reason: "Unsupported presigned URL scheme".to_string(),
362            }));
363        };
364
365        Ok(PresignedRequest {
366            backend,
367            expiration,
368            operation: PresignedOperation::Delete,
369            path: path.to_string(),
370        })
371    }
372}
373
374#[async_trait]
375impl object_store::ObjectStore for GrpcStorage {
376    async fn put_opts(
377        &self,
378        location: &Path,
379        payload: PutPayload,
380        options: PutOptions,
381    ) -> object_store::Result<PutResult> {
382        let mut client = self.client();
383        let path_str = location.to_string();
384        // PutPayload from object_store 0.11.2 can be converted to Bytes directly (synchronously)
385        let data_bytes: Bytes = payload.into();
386
387        let proto_request = StoragePutRequest {
388            binding_name: self.binding_name.clone(),
389            path: path_str.clone(),
390            data: data_bytes.into(), // prost Bytes from std Vec<u8> or bytes::Bytes
391            options: storage_utils::map_os_put_options_to_proto(options),
392        };
393
394        let response = client
395            .put(Request::new(proto_request))
396            .await
397            .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?
398            .into_inner();
399
400        Ok(PutResult {
401            e_tag: response.e_tag,
402            version: response.version,
403        })
404    }
405
406    async fn put_multipart_opts(
407        &self,
408        location: &Path,
409        opts: PutMultipartOptions,
410    ) -> object_store::Result<Box<dyn MultipartUpload>> {
411        let mut client = self.client();
412        let path_str = location.to_string();
413
414        let metadata_proto = StoragePutMultipartMetadata {
415            binding_name: self.binding_name.clone(),
416            path: path_str.clone(),
417            options: storage_utils::map_os_put_multipart_opts_to_proto(opts),
418        };
419
420        let initial_request_part = StoragePutMultipartChunkRequestPart::Metadata(metadata_proto);
421        let initial_request = StoragePutMultipartChunkRequest {
422            part: Some(initial_request_part),
423        };
424
425        let (tx, rx) = mpsc::channel::<object_store::Result<StoragePutMultipartChunkRequest>>(4);
426
427        tx.send(Ok(initial_request))
428            .await
429            .map_err(|_e| ObjectStoreError::Generic {
430                store: "GrpcClient::put_multipart_opts",
431                source: "Failed to send initial metadata for multipart upload".into(),
432            })?;
433
434        // tonic::Request::new can take a stream directly.
435        // Ensure the stream item type matches what client.put_multipart expects.
436        // The stream should yield `StoragePutMultipartChunkRequest`.
437        let request_stream = ReceiverStream::new(rx).map(|result_item| {
438            match result_item {
439                Ok(req) => req, // This is StoragePutMultipartChunkRequest
440                Err(e) => {     // This is object_store::Error
441                    // The gRPC stream expects StoragePutMultipartChunkRequest, not Result<..., object_store::Error>
442                    // If an error occurs converting a part, we should probably abort the stream from the client side.
443                    // For now, this error path in the stream mapping is problematic.
444                    // The put_part method itself returns Result, if it fails, the stream from rx will just end.
445                    // So, we should not be sending object_store::Error into this stream.
446                    // Let's assume rx only sends Ok(StoragePutMultipartChunkRequest) and ends if put_part fails.
447                    // This implies put_part failing should not try to send an error message via this stream.
448                    // This mapping needs to be infallible or handle errors differently.
449                    // For now, we assume `put_part` handles its errors and closes the stream if needed.
450                    // A simpler map that assumes `put_part` only sends `Ok` or stops:
451                    panic!("Error received in put_multipart request stream from mpsc: {:?}. This should not happen.", e);
452                }
453            }
454        });
455
456        let response_join_handle: JoinHandle<Result<tonic::Response<StoragePutResponse>, Status>> =
457            tokio::spawn(async move { client.put_multipart(Request::new(request_stream)).await });
458
459        Ok(Box::new(GrpcMultipartUpload {
460            path: location.clone(),
461            client_stream_sender: Some(tx),
462            response_join_handle: Some(response_join_handle),
463        }))
464    }
465
466    async fn get_opts(
467        &self,
468        location: &Path,
469        options: GetOptions,
470    ) -> object_store::Result<GetResult> {
471        let mut client = self.client();
472        let path_str = location.to_string();
473
474        let request_options = options.clone(); // Clone for potential later use with range
475        let proto_request = storage_utils::map_os_get_options_to_proto_request(
476            options,
477            self.binding_name.clone(),
478            path_str.clone(),
479        );
480
481        let mut grpc_stream = client
482            .get(Request::new(proto_request))
483            .await
484            .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str.clone())))?
485            .into_inner();
486
487        let first_part_res = grpc_stream.next().await;
488        let first_part = match first_part_res {
489            Some(Ok(part)) => part,
490            Some(Err(status)) => {
491                return Err(storage_utils::map_status_to_os_error(
492                    status,
493                    Some(path_str.clone()),
494                ))
495            }
496            None => {
497                return Err(ObjectStoreError::Generic {
498                    store: "gRPC",
499                    source: "GetResponsePart stream was empty, expected metadata".into(),
500                })
501            }
502        };
503
504        let proto_meta = match first_part.part {
505            Some(get_response_part::Part::Metadata(meta)) => meta,
506            _ => {
507                return Err(ObjectStoreError::Generic {
508                    store: "gRPC",
509                    source: "First message in GetResponsePart stream was not Metadata".into(),
510                })
511            }
512        };
513        let object_meta = storage_utils::map_proto_object_meta_to_os(proto_meta)?;
514
515        let (tx, rx) = mpsc::channel::<object_store::Result<Bytes>>(4);
516        let error_path_clone = path_str.clone();
517
518        tokio::spawn(async move {
519            while let Some(stream_item_result) = grpc_stream.next().await {
520                match stream_item_result {
521                    Ok(response_part) => match response_part.part {
522                        Some(get_response_part::Part::ChunkData(data)) => {
523                            if tx.send(Ok(data.into())).await.is_err() {
524                                break;
525                            }
526                        }
527                        Some(get_response_part::Part::Metadata(_)) => {
528                            let _ = tx
529                                .send(Err(ObjectStoreError::Generic {
530                                    store: "gRPC",
531                                    source: "Received metadata again in GetResponsePart stream"
532                                        .into(),
533                                }))
534                                .await;
535                            break;
536                        }
537                        None => {
538                            let _ = tx
539                                .send(Err(ObjectStoreError::Generic {
540                                    store: "gRPC",
541                                    source: "Empty part in GetResponsePart stream".into(),
542                                }))
543                                .await;
544                            break;
545                        }
546                    },
547                    Err(status) => {
548                        let _ = tx
549                            .send(Err(storage_utils::map_status_to_os_error(
550                                status,
551                                Some(error_path_clone.clone()),
552                            )))
553                            .await;
554                        break;
555                    }
556                }
557            }
558        });
559
560        let calculated_range = request_options
561            .range
562            .map_or(0..object_meta.size, |r| match r {
563                OsGetRange::Bounded(br) => br.start..br.end,
564                OsGetRange::Offset(o) => std::cmp::min(o, object_meta.size)..object_meta.size,
565                OsGetRange::Suffix(s) => object_meta.size.saturating_sub(s)..object_meta.size,
566            });
567
568        Ok(GetResult {
569            payload: GetResultPayload::Stream(ReceiverStream::new(rx).boxed()),
570            meta: object_meta,
571            range: calculated_range,
572            attributes: OsAttributes::default(), // TODO: Map attributes from proto_meta if available
573        })
574    }
575
576    async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
577        let mut client = self.client();
578        let path_str = location.to_string();
579
580        let proto_request = StorageHeadRequest {
581            binding_name: self.binding_name.clone(),
582            path: path_str.clone(),
583        };
584
585        let response = client
586            .head(Request::new(proto_request))
587            .await
588            .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?
589            .into_inner();
590        storage_utils::map_proto_object_meta_to_os(response)
591    }
592
593    async fn delete(&self, location: &Path) -> object_store::Result<()> {
594        let mut client = self.client();
595        let path_str = location.to_string();
596        let proto_request = StorageDeleteRequest {
597            binding_name: self.binding_name.clone(),
598            path: path_str.clone(),
599        };
600        client
601            .delete(Request::new(proto_request))
602            .await
603            .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?;
604        Ok(())
605    }
606
607    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
608        let mut client = self.client();
609        let binding_name = self.binding_name.clone();
610        let prefix_path_str = prefix.map(|p| p.to_string());
611
612        try_stream! { // ensure async-stream is in Cargo.toml
613            let proto_request = StorageListRequest {
614                binding_name: binding_name.clone(),
615                prefix: prefix_path_str.clone(),
616                offset: None,
617            };
618
619            let mut stream = client.list(Request::new(proto_request)).await
620                .map_err(|s| storage_utils::map_status_to_os_error(s, prefix_path_str.clone()))?
621                .into_inner();
622
623            while let Some(item_result) = stream.next().await {
624                match item_result {
625                    Ok(proto_meta) => {
626                        yield storage_utils::map_proto_object_meta_to_os(proto_meta)?;
627                    }
628                    Err(status) => {
629                        Err(storage_utils::map_status_to_os_error(status, prefix_path_str.clone()))?;
630                    }
631                }
632            }
633        }
634        .boxed()
635    }
636
637    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
638        let mut client = self.client();
639        let path_str = prefix.map(|p| p.to_string());
640        let proto_request = StorageListWithDelimiterRequest {
641            binding_name: self.binding_name.clone(),
642            prefix: path_str.clone(),
643        };
644        let response = client
645            .list_with_delimiter(Request::new(proto_request))
646            .await
647            .map_err(|s| storage_utils::map_status_to_os_error(s, path_str))?
648            .into_inner();
649        let common_prefixes = response
650            .common_prefixes
651            .into_iter()
652            .map(Path::from)
653            .collect();
654        let objects = response
655            .objects
656            .into_iter()
657            .map(storage_utils::map_proto_object_meta_to_os)
658            .collect::<Result<Vec<_>, _>>()?;
659        Ok(ListResult {
660            common_prefixes,
661            objects,
662        })
663    }
664
665    async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
666        let mut client = self.client();
667        let from_str = from.to_string();
668        let to_str = to.to_string();
669        let proto_request = StorageCopyRequest {
670            binding_name: self.binding_name.clone(),
671            from_path: from_str.clone(),
672            to_path: to_str.clone(),
673        };
674        client
675            .copy(Request::new(proto_request))
676            .await
677            .map_err(|s| {
678                storage_utils::map_status_to_os_error(
679                    s,
680                    Some(format!("copy from {} to {}", from_str, to_str)),
681                )
682            })?;
683        Ok(())
684    }
685
686    async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
687        let mut client = self.client();
688        let from_str = from.to_string();
689        let to_str = to.to_string();
690        let proto_request = StorageRenameRequest {
691            binding_name: self.binding_name.clone(),
692            from_path: from_str.clone(),
693            to_path: to_str.clone(),
694        };
695        client
696            .rename(Request::new(proto_request))
697            .await
698            .map_err(|s| {
699                storage_utils::map_status_to_os_error(
700                    s,
701                    Some(format!("rename from {} to {}", from_str, to_str)),
702                )
703            })?;
704        Ok(())
705    }
706
707    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
708        let mut client = self.client();
709        let from_str = from.to_string();
710        let to_str = to.to_string();
711        let proto_request = StorageCopyRequest {
712            binding_name: self.binding_name.clone(),
713            from_path: from_str.clone(),
714            to_path: to_str.clone(),
715        };
716        client
717            .copy_if_not_exists(Request::new(proto_request))
718            .await
719            .map_err(|s| {
720                storage_utils::map_status_to_os_error(
721                    s,
722                    Some(format!(
723                        "copy_if_not_exists from {} to {}",
724                        from_str, to_str
725                    )),
726                )
727            })?;
728        Ok(())
729    }
730
731    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
732        let mut client = self.client();
733        let from_str = from.to_string();
734        let to_str = to.to_string();
735        let proto_request = StorageRenameRequest {
736            binding_name: self.binding_name.clone(),
737            from_path: from_str.clone(),
738            to_path: to_str.clone(),
739        };
740        client
741            .rename_if_not_exists(Request::new(proto_request))
742            .await
743            .map_err(|s| {
744                storage_utils::map_status_to_os_error(
745                    s,
746                    Some(format!(
747                        "rename_if_not_exists from {} to {}",
748                        from_str, to_str
749                    )),
750                )
751            })?;
752        Ok(())
753    }
754}
755
756#[cfg(test)]
757mod tests {
758    use super::*;
759
760    #[test]
761    fn http_presigned_backend_keeps_query_auth_out_of_headers() {
762        let signed_url = "https://bucket.s3.us-east-2.amazonaws.com/key\
763            ?X-Amz-Algorithm=AWS4-HMAC-SHA256\
764            &X-Amz-Credential=AKIA%2F20260527%2Fus-east-2%2Fs3%2Faws4_request\
765            &X-Amz-Date=20260527T173759Z\
766            &X-Amz-Expires=300\
767            &X-Amz-SignedHeaders=host\
768            &X-Amz-Signature=abc123";
769
770        let backend = GrpcStorage::http_presigned_backend(
771            signed_url,
772            "PUT",
773            HashMap::new(),
774            "Invalid presigned PUT URL format",
775        )
776        .expect("valid signed URL");
777
778        match backend {
779            PresignedRequestBackend::Http {
780                url,
781                method,
782                headers,
783            } => {
784                assert_eq!(url, signed_url);
785                assert_eq!(method, "PUT");
786                assert!(headers.is_empty());
787            }
788            PresignedRequestBackend::Local { .. } => panic!("expected HTTP backend"),
789        }
790    }
791
792    #[test]
793    fn http_presigned_backend_preserves_provider_headers() {
794        let signed_url = "https://account.blob.core.windows.net/container/key?sas=token";
795        let mut headers = HashMap::new();
796        headers.insert("x-ms-blob-type".to_string(), "BlockBlob".to_string());
797
798        let backend = GrpcStorage::http_presigned_backend(
799            signed_url,
800            "PUT",
801            headers,
802            "Invalid presigned PUT URL format",
803        )
804        .expect("valid signed URL");
805
806        match backend {
807            PresignedRequestBackend::Http { headers, .. } => {
808                assert_eq!(
809                    headers.get("x-ms-blob-type"),
810                    Some(&"BlockBlob".to_string())
811                );
812            }
813            PresignedRequestBackend::Local { .. } => panic!("expected HTTP backend"),
814        }
815    }
816}
817
818#[derive(Debug)]
819struct GrpcMultipartUpload {
820    path: Path, // For error reporting
821    client_stream_sender:
822        Option<mpsc::Sender<object_store::Result<StoragePutMultipartChunkRequest>>>,
823    response_join_handle: Option<JoinHandle<Result<tonic::Response<StoragePutResponse>, Status>>>,
824}
825
826#[async_trait]
827impl MultipartUpload for GrpcMultipartUpload {
828    fn put_part(&mut self, data: PutPayload) -> object_store::UploadPart {
829        let sender_clone = match self.client_stream_sender.as_ref() {
830            Some(s) => s.clone(),
831            None => {
832                return Box::pin(async {
833                    Err(ObjectStoreError::Generic {
834                        store: "GrpcMultipartUpload::put_part",
835                        source: "Sender unavailable; put_part called after complete/abort or on failed init.".into(),
836                    })
837                });
838            }
839        };
840
841        Box::pin(async move {
842            let bytes_data: Bytes = data.into(); // Synchronous conversion
843            let chunk_data_part = StoragePutMultipartChunkRequestPart::ChunkData(bytes_data.into());
844            let request = StoragePutMultipartChunkRequest {
845                part: Some(chunk_data_part),
846            };
847
848            sender_clone
849                .send(Ok(request))
850                .await
851                .map_err(|e| ObjectStoreError::Generic {
852                    store: "GrpcMultipartUpload::put_part",
853                    source: format!("Failed to send part, gRPC call might have failed: {}", e)
854                        .into(),
855                })
856        })
857    }
858
859    async fn complete(&mut self) -> object_store::Result<PutResult> {
860        if let Some(sender) = self.client_stream_sender.take() {
861            drop(sender);
862        }
863
864        let handle = self
865            .response_join_handle
866            .take()
867            .ok_or_else(|| ObjectStoreError::Generic {
868                store: "GrpcMultipartUpload::complete",
869                source: "complete called more than once or on an already aborted/failed upload"
870                    .into(),
871            })?;
872
873        match handle.await {
874            Ok(Ok(response)) => {
875                let put_response = response.into_inner();
876                Ok(PutResult {
877                    e_tag: put_response.e_tag,
878                    version: put_response.version,
879                })
880            }
881            Ok(Err(status)) => Err(storage_utils::map_status_to_os_error(
882                status,
883                Some(self.path.to_string()),
884            )),
885            Err(join_err) => Err(ObjectStoreError::from(join_err)),
886        }
887    }
888
889    async fn abort(&mut self) -> object_store::Result<()> {
890        if let Some(sender) = self.client_stream_sender.take() {
891            drop(sender);
892        }
893
894        if let Some(handle) = self.response_join_handle.take() {
895            handle.abort();
896            match handle.await {
897                Ok(Ok(_resp)) => {
898                    // Task completed successfully even after abort signal.
899                    // Consider this as successful abortion from client's perspective.
900                    Ok(())
901                }
902                Ok(Err(status)) => {
903                    if status.code() == tonic::Code::Cancelled {
904                        Ok(()) // Expected outcome for cancellation
905                    } else {
906                        Err(storage_utils::map_status_to_os_error(
907                            status,
908                            Some(self.path.to_string()),
909                        ))
910                    }
911                }
912                Err(_join_err) => {
913                    // JoinError after abort is expected if the task panicked or was forcefully terminated.
914                    Ok(())
915                }
916            }
917        } else {
918            // Abort called after already completed or aborted.
919            Ok(())
920        }
921    }
922}