Skip to main content

alien_bindings/providers/storage/
grpc.rs

1use crate::error::ErrorData;
2use crate::grpc::storage_utils;
3use crate::grpc::MAX_GRPC_MESSAGE_SIZE;
4use crate::{
5    grpc::storage_service::alien_bindings::storage::{
6        get_response_part,
7        storage_put_multipart_chunk_request::Part as StoragePutMultipartChunkRequestPart,
8        storage_service_client::StorageServiceClient, StorageCopyRequest, StorageDeleteRequest,
9        StorageGetBaseDirRequest, StorageGetUrlRequest, StorageHeadRequest, StorageHttpMethod,
10        StorageListRequest, StorageListWithDelimiterRequest, StoragePutMultipartChunkRequest,
11        StoragePutMultipartMetadata, StoragePutRequest, StoragePutResponse, StorageRenameRequest,
12        StorageSignedUrlRequest,
13    },
14    presigned::{LocalOperation, PresignedOperation, PresignedRequest, PresignedRequestBackend},
15    traits::Binding,
16};
17
18use alien_error::AlienError;
19use alien_error::Context as _;
20use alien_error::IntoAlienError as _;
21use async_stream::try_stream;
22use async_trait::async_trait;
23use bytes::Bytes;
24use chrono::{self, Utc};
25use futures::{stream::BoxStream, StreamExt};
26use object_store::{
27    path::Path, Attributes as OsAttributes, Error as ObjectStoreError, GetOptions,
28    GetRange as OsGetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
29    PutMultipartOpts, PutOptions, PutPayload, PutResult,
30};
31use prost_types;
32use std::fmt::{Debug, Formatter};
33use std::time::Duration;
34use tokio::sync::mpsc;
35use tokio::task::JoinHandle;
36use tokio_stream::wrappers::ReceiverStream;
37use tonic::{transport::Channel, Request, Status};
38use url::Url;
39
40#[derive(Debug)]
41pub struct GrpcStorage {
42    client: StorageServiceClient<Channel>,
43    binding_name: String,
44    base_dir: Path,
45    base_url: Url,
46}
47
48impl GrpcStorage {
49    pub async fn new(binding_name: String, grpc_address: String) -> crate::error::Result<Self> {
50        let channel = crate::providers::grpc_provider::create_grpc_channel(grpc_address).await?;
51        Self::new_from_channel(channel, binding_name).await
52    }
53
54    pub async fn new_from_channel(
55        channel: Channel,
56        binding_name: String,
57    ) -> crate::error::Result<Self> {
58        tracing::debug!(
59            "GrpcStorage::new_from_channel: Creating client for binding: {}",
60            binding_name
61        );
62        let mut client = StorageServiceClient::new(channel.clone())
63            .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE);
64
65        tracing::debug!(
66            "GrpcStorage::new_from_channel: Calling get_base_dir for binding: {}",
67            binding_name
68        );
69        let base_dir_req = StorageGetBaseDirRequest {
70            binding_name: binding_name.clone(),
71        };
72        let base_dir_resp = client
73            .get_base_dir(Request::new(base_dir_req))
74            .await
75            .into_alien_error()
76            .context(ErrorData::GrpcRequestFailed {
77                service: "storage".to_string(),
78                method: "get_base_dir".to_string(),
79                details: format!("Failed to get base directory for binding {}", binding_name),
80            })?
81            .into_inner();
82        let base_dir = Path::from(base_dir_resp.path);
83        tracing::debug!("GrpcStorage::new_from_channel: Got base_dir: {}", base_dir);
84
85        tracing::debug!(
86            "GrpcStorage::new_from_channel: Calling get_url for binding: {}",
87            binding_name
88        );
89        let get_url_req = StorageGetUrlRequest {
90            binding_name: binding_name.clone(),
91        };
92        let get_url_resp = client
93            .get_url(Request::new(get_url_req))
94            .await
95            .into_alien_error()
96            .context(ErrorData::GrpcRequestFailed {
97                service: "storage".to_string(),
98                method: "get_url".to_string(),
99                details: format!("Failed to get URL for binding {}", binding_name),
100            })?
101            .into_inner();
102        tracing::debug!(
103            "GrpcStorage::new_from_channel: Got url: {}",
104            get_url_resp.url
105        );
106        let base_url = Url::parse(&get_url_resp.url).into_alien_error().context(
107            ErrorData::BindingConfigInvalid {
108                binding_name: binding_name.clone(),
109                reason: format!("Invalid base_url: {}", get_url_resp.url),
110            },
111        )?;
112
113        Ok(Self {
114            client: StorageServiceClient::new(channel) // Can re-use the original channel
115                .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE)
116                .max_encoding_message_size(MAX_GRPC_MESSAGE_SIZE),
117            binding_name,
118            base_dir,
119            base_url,
120        })
121    }
122
123    fn client(&self) -> StorageServiceClient<Channel> {
124        self.client.clone()
125    }
126}
127
128impl std::fmt::Display for GrpcStorage {
129    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
130        write!(
131            f,
132            "GrpcStorage(binding='{}', base_url='{}')",
133            self.binding_name, self.base_url
134        )
135    }
136}
137
138impl Binding for GrpcStorage {}
139
140#[async_trait]
141impl crate::Storage for GrpcStorage {
142    fn get_base_dir(&self) -> Path {
143        self.base_dir.clone()
144    }
145
146    fn get_url(&self) -> Url {
147        self.base_url.clone()
148    }
149
150    async fn presigned_put(
151        &self,
152        path: &Path,
153        expires_in: Duration,
154    ) -> crate::error::Result<PresignedRequest> {
155        let mut client = self.client();
156        let expiration = Utc::now()
157            + chrono::Duration::from_std(expires_in)
158                .into_alien_error()
159                .context(ErrorData::Other {
160                    message: "Invalid duration for presigned PUT request".to_string(),
161                })?;
162
163        let request = StorageSignedUrlRequest {
164            binding_name: self.binding_name.clone(),
165            path: path.to_string(),
166            http_method: StorageHttpMethod::HttpMethodPut as i32,
167            expiration_time: Some(prost_types::Timestamp {
168                seconds: expiration.timestamp(),
169                nanos: expiration.timestamp_subsec_nanos() as i32,
170            }),
171        };
172
173        let response = client
174            .signed_url(tonic::Request::new(request))
175            .await
176            .into_alien_error()
177            .context(ErrorData::GrpcRequestFailed {
178                service: "storage".to_string(),
179                method: "signed_url".to_string(),
180                details: "Failed to generate presigned PUT URL".to_string(),
181            })?
182            .into_inner();
183
184        // Parse the returned URL to determine if it's HTTP or local
185        let url = &response.url;
186        let backend = if url.starts_with("http://") || url.starts_with("https://") {
187            // HTTP-based presigned URL (AWS S3, GCP GCS, Azure Blob)
188            let parsed_url = reqwest::Url::parse(url).into_alien_error().context(
189                ErrorData::InvalidConfigurationUrl {
190                    url: url.to_string(),
191                    reason: "Invalid presigned PUT URL format".to_string(),
192                },
193            )?;
194
195            // Extract headers from query parameters if any (some providers include them)
196            let mut headers = std::collections::HashMap::new();
197            for (key, value) in parsed_url.query_pairs() {
198                if key.starts_with("X-") || key.starts_with("x-") {
199                    headers.insert(key.to_string(), value.to_string());
200                }
201            }
202
203            PresignedRequestBackend::Http {
204                url: url.clone(),
205                method: "PUT".to_string(),
206                headers,
207            }
208        } else if url.starts_with("local://") {
209            // Local filesystem URL
210            let file_path = url.strip_prefix("local://").unwrap_or(url);
211            PresignedRequestBackend::Local {
212                file_path: file_path.to_string(),
213                operation: LocalOperation::Put,
214            }
215        } else {
216            return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
217                url: url.to_string(),
218                reason: "Unsupported presigned URL scheme".to_string(),
219            }));
220        };
221
222        Ok(PresignedRequest {
223            backend,
224            expiration,
225            operation: PresignedOperation::Put,
226            path: path.to_string(),
227        })
228    }
229
230    async fn presigned_get(
231        &self,
232        path: &Path,
233        expires_in: Duration,
234    ) -> crate::error::Result<PresignedRequest> {
235        let mut client = self.client();
236        let expiration = Utc::now()
237            + chrono::Duration::from_std(expires_in)
238                .into_alien_error()
239                .context(ErrorData::Other {
240                    message: "Invalid duration for presigned GET request".to_string(),
241                })?;
242
243        let request = StorageSignedUrlRequest {
244            binding_name: self.binding_name.clone(),
245            path: path.to_string(),
246            http_method: StorageHttpMethod::HttpMethodGet as i32,
247            expiration_time: Some(prost_types::Timestamp {
248                seconds: expiration.timestamp(),
249                nanos: expiration.timestamp_subsec_nanos() as i32,
250            }),
251        };
252
253        let response = client
254            .signed_url(tonic::Request::new(request))
255            .await
256            .into_alien_error()
257            .context(ErrorData::GrpcRequestFailed {
258                service: "storage".to_string(),
259                method: "signed_url".to_string(),
260                details: "Failed to generate presigned GET URL".to_string(),
261            })?
262            .into_inner();
263
264        // Parse the returned URL to determine if it's HTTP or local
265        let url = &response.url;
266        let backend = if url.starts_with("http://") || url.starts_with("https://") {
267            // HTTP-based presigned URL (AWS S3, GCP GCS, Azure Blob)
268            let parsed_url = reqwest::Url::parse(url).into_alien_error().context(
269                ErrorData::InvalidConfigurationUrl {
270                    url: url.to_string(),
271                    reason: "Invalid presigned GET URL format".to_string(),
272                },
273            )?;
274
275            // Extract headers from query parameters if any (some providers include them)
276            let mut headers = std::collections::HashMap::new();
277            for (key, value) in parsed_url.query_pairs() {
278                if key.starts_with("X-") || key.starts_with("x-") {
279                    headers.insert(key.to_string(), value.to_string());
280                }
281            }
282
283            PresignedRequestBackend::Http {
284                url: url.clone(),
285                method: "GET".to_string(),
286                headers,
287            }
288        } else if url.starts_with("local://") {
289            // Local filesystem URL
290            let file_path = url.strip_prefix("local://").unwrap_or(url);
291            PresignedRequestBackend::Local {
292                file_path: file_path.to_string(),
293                operation: LocalOperation::Get,
294            }
295        } else {
296            return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
297                url: url.to_string(),
298                reason: "Unsupported presigned URL scheme".to_string(),
299            }));
300        };
301
302        Ok(PresignedRequest {
303            backend,
304            expiration,
305            operation: PresignedOperation::Get,
306            path: path.to_string(),
307        })
308    }
309
310    async fn presigned_delete(
311        &self,
312        path: &Path,
313        expires_in: Duration,
314    ) -> crate::error::Result<PresignedRequest> {
315        let mut client = self.client();
316        let expiration = Utc::now()
317            + chrono::Duration::from_std(expires_in)
318                .into_alien_error()
319                .context(ErrorData::Other {
320                    message: "Invalid duration for presigned DELETE request".to_string(),
321                })?;
322
323        let request = StorageSignedUrlRequest {
324            binding_name: self.binding_name.clone(),
325            path: path.to_string(),
326            http_method: StorageHttpMethod::HttpMethodDelete as i32,
327            expiration_time: Some(prost_types::Timestamp {
328                seconds: expiration.timestamp(),
329                nanos: expiration.timestamp_subsec_nanos() as i32,
330            }),
331        };
332
333        let response = client
334            .signed_url(tonic::Request::new(request))
335            .await
336            .into_alien_error()
337            .context(ErrorData::GrpcRequestFailed {
338                service: "storage".to_string(),
339                method: "signed_url".to_string(),
340                details: "Failed to generate presigned DELETE URL".to_string(),
341            })?
342            .into_inner();
343
344        // Parse the returned URL to determine if it's HTTP or local
345        let url = &response.url;
346        let backend = if url.starts_with("http://") || url.starts_with("https://") {
347            // HTTP-based presigned URL (AWS S3, GCP GCS, Azure Blob)
348            let parsed_url = reqwest::Url::parse(url).into_alien_error().context(
349                ErrorData::InvalidConfigurationUrl {
350                    url: url.to_string(),
351                    reason: "Invalid presigned DELETE URL format".to_string(),
352                },
353            )?;
354
355            // Extract headers from query parameters if any (some providers include them)
356            let mut headers = std::collections::HashMap::new();
357            for (key, value) in parsed_url.query_pairs() {
358                if key.starts_with("X-") || key.starts_with("x-") {
359                    headers.insert(key.to_string(), value.to_string());
360                }
361            }
362
363            PresignedRequestBackend::Http {
364                url: url.clone(),
365                method: "DELETE".to_string(),
366                headers,
367            }
368        } else if url.starts_with("local://") {
369            // Local filesystem URL
370            let file_path = url.strip_prefix("local://").unwrap_or(url);
371            PresignedRequestBackend::Local {
372                file_path: file_path.to_string(),
373                operation: LocalOperation::Delete,
374            }
375        } else {
376            return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
377                url: url.to_string(),
378                reason: "Unsupported presigned URL scheme".to_string(),
379            }));
380        };
381
382        Ok(PresignedRequest {
383            backend,
384            expiration,
385            operation: PresignedOperation::Delete,
386            path: path.to_string(),
387        })
388    }
389}
390
391#[async_trait]
392impl object_store::ObjectStore for GrpcStorage {
393    async fn put_opts(
394        &self,
395        location: &Path,
396        payload: PutPayload,
397        options: PutOptions,
398    ) -> object_store::Result<PutResult> {
399        let mut client = self.client();
400        let path_str = location.to_string();
401        // PutPayload from object_store 0.11.2 can be converted to Bytes directly (synchronously)
402        let data_bytes: Bytes = payload.into();
403
404        let proto_request = StoragePutRequest {
405            binding_name: self.binding_name.clone(),
406            path: path_str.clone(),
407            data: data_bytes.into(), // prost Bytes from std Vec<u8> or bytes::Bytes
408            options: storage_utils::map_os_put_options_to_proto(options),
409        };
410
411        let response = client
412            .put(Request::new(proto_request))
413            .await
414            .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?
415            .into_inner();
416
417        Ok(PutResult {
418            e_tag: response.e_tag,
419            version: response.version,
420        })
421    }
422
423    async fn put_multipart_opts(
424        &self,
425        location: &Path,
426        opts: PutMultipartOpts,
427    ) -> object_store::Result<Box<dyn MultipartUpload>> {
428        let mut client = self.client();
429        let path_str = location.to_string();
430
431        let metadata_proto = StoragePutMultipartMetadata {
432            binding_name: self.binding_name.clone(),
433            path: path_str.clone(),
434            options: storage_utils::map_os_put_multipart_opts_to_proto(opts),
435        };
436
437        let initial_request_part = StoragePutMultipartChunkRequestPart::Metadata(metadata_proto);
438        let initial_request = StoragePutMultipartChunkRequest {
439            part: Some(initial_request_part),
440        };
441
442        let (tx, rx) = mpsc::channel::<object_store::Result<StoragePutMultipartChunkRequest>>(4);
443
444        tx.send(Ok(initial_request))
445            .await
446            .map_err(|_e| ObjectStoreError::Generic {
447                store: "GrpcClient::put_multipart_opts",
448                source: "Failed to send initial metadata for multipart upload".into(),
449            })?;
450
451        // tonic::Request::new can take a stream directly.
452        // Ensure the stream item type matches what client.put_multipart expects.
453        // The stream should yield `StoragePutMultipartChunkRequest`.
454        let request_stream = ReceiverStream::new(rx).map(|result_item| {
455            match result_item {
456                Ok(req) => req, // This is StoragePutMultipartChunkRequest
457                Err(e) => {     // This is object_store::Error
458                    // The gRPC stream expects StoragePutMultipartChunkRequest, not Result<..., object_store::Error>
459                    // If an error occurs converting a part, we should probably abort the stream from the client side.
460                    // For now, this error path in the stream mapping is problematic.
461                    // The put_part method itself returns Result, if it fails, the stream from rx will just end.
462                    // So, we should not be sending object_store::Error into this stream.
463                    // Let's assume rx only sends Ok(StoragePutMultipartChunkRequest) and ends if put_part fails.
464                    // This implies put_part failing should not try to send an error message via this stream.
465                    // This mapping needs to be infallible or handle errors differently.
466                    // For now, we assume `put_part` handles its errors and closes the stream if needed.
467                    // A simpler map that assumes `put_part` only sends `Ok` or stops:
468                    panic!("Error received in put_multipart request stream from mpsc: {:?}. This should not happen.", e);
469                }
470            }
471        });
472
473        let response_join_handle: JoinHandle<Result<tonic::Response<StoragePutResponse>, Status>> =
474            tokio::spawn(async move { client.put_multipart(Request::new(request_stream)).await });
475
476        Ok(Box::new(GrpcMultipartUpload {
477            path: location.clone(),
478            client_stream_sender: Some(tx),
479            response_join_handle: Some(response_join_handle),
480        }))
481    }
482
483    async fn get_opts(
484        &self,
485        location: &Path,
486        options: GetOptions,
487    ) -> object_store::Result<GetResult> {
488        let mut client = self.client();
489        let path_str = location.to_string();
490
491        let request_options = options.clone(); // Clone for potential later use with range
492        let proto_request = storage_utils::map_os_get_options_to_proto_request(
493            options,
494            self.binding_name.clone(),
495            path_str.clone(),
496        );
497
498        let mut grpc_stream = client
499            .get(Request::new(proto_request))
500            .await
501            .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str.clone())))?
502            .into_inner();
503
504        let first_part_res = grpc_stream.next().await;
505        let first_part = match first_part_res {
506            Some(Ok(part)) => part,
507            Some(Err(status)) => {
508                return Err(storage_utils::map_status_to_os_error(
509                    status,
510                    Some(path_str.clone()),
511                ))
512            }
513            None => {
514                return Err(ObjectStoreError::Generic {
515                    store: "gRPC",
516                    source: "GetResponsePart stream was empty, expected metadata".into(),
517                })
518            }
519        };
520
521        let proto_meta = match first_part.part {
522            Some(get_response_part::Part::Metadata(meta)) => meta,
523            _ => {
524                return Err(ObjectStoreError::Generic {
525                    store: "gRPC",
526                    source: "First message in GetResponsePart stream was not Metadata".into(),
527                })
528            }
529        };
530        let object_meta = storage_utils::map_proto_object_meta_to_os(proto_meta)?;
531
532        let (tx, rx) = mpsc::channel::<object_store::Result<Bytes>>(4);
533        let error_path_clone = path_str.clone();
534
535        tokio::spawn(async move {
536            while let Some(stream_item_result) = grpc_stream.next().await {
537                match stream_item_result {
538                    Ok(response_part) => match response_part.part {
539                        Some(get_response_part::Part::ChunkData(data)) => {
540                            if tx.send(Ok(data.into())).await.is_err() {
541                                break;
542                            }
543                        }
544                        Some(get_response_part::Part::Metadata(_)) => {
545                            let _ = tx
546                                .send(Err(ObjectStoreError::Generic {
547                                    store: "gRPC",
548                                    source: "Received metadata again in GetResponsePart stream"
549                                        .into(),
550                                }))
551                                .await;
552                            break;
553                        }
554                        None => {
555                            let _ = tx
556                                .send(Err(ObjectStoreError::Generic {
557                                    store: "gRPC",
558                                    source: "Empty part in GetResponsePart stream".into(),
559                                }))
560                                .await;
561                            break;
562                        }
563                    },
564                    Err(status) => {
565                        let _ = tx
566                            .send(Err(storage_utils::map_status_to_os_error(
567                                status,
568                                Some(error_path_clone.clone()),
569                            )))
570                            .await;
571                        break;
572                    }
573                }
574            }
575        });
576
577        let calculated_range = request_options
578            .range
579            .map_or(0..object_meta.size, |r| match r {
580                OsGetRange::Bounded(br) => br.start..br.end,
581                OsGetRange::Offset(o) => std::cmp::min(o, object_meta.size)..object_meta.size,
582                OsGetRange::Suffix(s) => object_meta.size.saturating_sub(s)..object_meta.size,
583            });
584
585        Ok(GetResult {
586            payload: GetResultPayload::Stream(ReceiverStream::new(rx).boxed()),
587            meta: object_meta,
588            range: calculated_range,
589            attributes: OsAttributes::default(), // TODO: Map attributes from proto_meta if available
590        })
591    }
592
593    async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
594        let mut client = self.client();
595        let path_str = location.to_string();
596
597        let proto_request = StorageHeadRequest {
598            binding_name: self.binding_name.clone(),
599            path: path_str.clone(),
600        };
601
602        let response = client
603            .head(Request::new(proto_request))
604            .await
605            .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?
606            .into_inner();
607        storage_utils::map_proto_object_meta_to_os(response)
608    }
609
610    async fn delete(&self, location: &Path) -> object_store::Result<()> {
611        let mut client = self.client();
612        let path_str = location.to_string();
613        let proto_request = StorageDeleteRequest {
614            binding_name: self.binding_name.clone(),
615            path: path_str.clone(),
616        };
617        client
618            .delete(Request::new(proto_request))
619            .await
620            .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?;
621        Ok(())
622    }
623
624    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
625        let mut client = self.client();
626        let binding_name = self.binding_name.clone();
627        let prefix_path_str = prefix.map(|p| p.to_string());
628
629        try_stream! { // ensure async-stream is in Cargo.toml
630            let proto_request = StorageListRequest {
631                binding_name: binding_name.clone(),
632                prefix: prefix_path_str.clone(),
633                offset: None,
634            };
635
636            let mut stream = client.list(Request::new(proto_request)).await
637                .map_err(|s| storage_utils::map_status_to_os_error(s, prefix_path_str.clone()))?
638                .into_inner();
639
640            while let Some(item_result) = stream.next().await {
641                match item_result {
642                    Ok(proto_meta) => {
643                        yield storage_utils::map_proto_object_meta_to_os(proto_meta)?;
644                    }
645                    Err(status) => {
646                        Err(storage_utils::map_status_to_os_error(status, prefix_path_str.clone()))?;
647                    }
648                }
649            }
650        }
651        .boxed()
652    }
653
654    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
655        let mut client = self.client();
656        let path_str = prefix.map(|p| p.to_string());
657        let proto_request = StorageListWithDelimiterRequest {
658            binding_name: self.binding_name.clone(),
659            prefix: path_str.clone(),
660        };
661        let response = client
662            .list_with_delimiter(Request::new(proto_request))
663            .await
664            .map_err(|s| storage_utils::map_status_to_os_error(s, path_str))?
665            .into_inner();
666        let common_prefixes = response
667            .common_prefixes
668            .into_iter()
669            .map(Path::from)
670            .collect();
671        let objects = response
672            .objects
673            .into_iter()
674            .map(storage_utils::map_proto_object_meta_to_os)
675            .collect::<Result<Vec<_>, _>>()?;
676        Ok(ListResult {
677            common_prefixes,
678            objects,
679        })
680    }
681
682    async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
683        let mut client = self.client();
684        let from_str = from.to_string();
685        let to_str = to.to_string();
686        let proto_request = StorageCopyRequest {
687            binding_name: self.binding_name.clone(),
688            from_path: from_str.clone(),
689            to_path: to_str.clone(),
690        };
691        client
692            .copy(Request::new(proto_request))
693            .await
694            .map_err(|s| {
695                storage_utils::map_status_to_os_error(
696                    s,
697                    Some(format!("copy from {} to {}", from_str, to_str)),
698                )
699            })?;
700        Ok(())
701    }
702
703    async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
704        let mut client = self.client();
705        let from_str = from.to_string();
706        let to_str = to.to_string();
707        let proto_request = StorageRenameRequest {
708            binding_name: self.binding_name.clone(),
709            from_path: from_str.clone(),
710            to_path: to_str.clone(),
711        };
712        client
713            .rename(Request::new(proto_request))
714            .await
715            .map_err(|s| {
716                storage_utils::map_status_to_os_error(
717                    s,
718                    Some(format!("rename from {} to {}", from_str, to_str)),
719                )
720            })?;
721        Ok(())
722    }
723
724    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
725        let mut client = self.client();
726        let from_str = from.to_string();
727        let to_str = to.to_string();
728        let proto_request = StorageCopyRequest {
729            binding_name: self.binding_name.clone(),
730            from_path: from_str.clone(),
731            to_path: to_str.clone(),
732        };
733        client
734            .copy_if_not_exists(Request::new(proto_request))
735            .await
736            .map_err(|s| {
737                storage_utils::map_status_to_os_error(
738                    s,
739                    Some(format!(
740                        "copy_if_not_exists from {} to {}",
741                        from_str, to_str
742                    )),
743                )
744            })?;
745        Ok(())
746    }
747
748    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
749        let mut client = self.client();
750        let from_str = from.to_string();
751        let to_str = to.to_string();
752        let proto_request = StorageRenameRequest {
753            binding_name: self.binding_name.clone(),
754            from_path: from_str.clone(),
755            to_path: to_str.clone(),
756        };
757        client
758            .rename_if_not_exists(Request::new(proto_request))
759            .await
760            .map_err(|s| {
761                storage_utils::map_status_to_os_error(
762                    s,
763                    Some(format!(
764                        "rename_if_not_exists from {} to {}",
765                        from_str, to_str
766                    )),
767                )
768            })?;
769        Ok(())
770    }
771}
772
773#[derive(Debug)]
774struct GrpcMultipartUpload {
775    path: Path, // For error reporting
776    client_stream_sender:
777        Option<mpsc::Sender<object_store::Result<StoragePutMultipartChunkRequest>>>,
778    response_join_handle: Option<JoinHandle<Result<tonic::Response<StoragePutResponse>, Status>>>,
779}
780
781#[async_trait]
782impl MultipartUpload for GrpcMultipartUpload {
783    fn put_part(&mut self, data: PutPayload) -> object_store::UploadPart {
784        let sender_clone = match self.client_stream_sender.as_ref() {
785            Some(s) => s.clone(),
786            None => {
787                return Box::pin(async {
788                    Err(ObjectStoreError::Generic {
789                        store: "GrpcMultipartUpload::put_part",
790                        source: "Sender unavailable; put_part called after complete/abort or on failed init.".into(),
791                    })
792                });
793            }
794        };
795
796        Box::pin(async move {
797            let bytes_data: Bytes = data.into(); // Synchronous conversion
798            let chunk_data_part = StoragePutMultipartChunkRequestPart::ChunkData(bytes_data.into());
799            let request = StoragePutMultipartChunkRequest {
800                part: Some(chunk_data_part),
801            };
802
803            sender_clone
804                .send(Ok(request))
805                .await
806                .map_err(|e| ObjectStoreError::Generic {
807                    store: "GrpcMultipartUpload::put_part",
808                    source: format!("Failed to send part, gRPC call might have failed: {}", e)
809                        .into(),
810                })
811        })
812    }
813
814    async fn complete(&mut self) -> object_store::Result<PutResult> {
815        if let Some(sender) = self.client_stream_sender.take() {
816            drop(sender);
817        }
818
819        let handle = self
820            .response_join_handle
821            .take()
822            .ok_or_else(|| ObjectStoreError::Generic {
823                store: "GrpcMultipartUpload::complete",
824                source: "complete called more than once or on an already aborted/failed upload"
825                    .into(),
826            })?;
827
828        match handle.await {
829            Ok(Ok(response)) => {
830                let put_response = response.into_inner();
831                Ok(PutResult {
832                    e_tag: put_response.e_tag,
833                    version: put_response.version,
834                })
835            }
836            Ok(Err(status)) => Err(storage_utils::map_status_to_os_error(
837                status,
838                Some(self.path.to_string()),
839            )),
840            Err(join_err) => Err(ObjectStoreError::from(join_err)),
841        }
842    }
843
844    async fn abort(&mut self) -> object_store::Result<()> {
845        if let Some(sender) = self.client_stream_sender.take() {
846            drop(sender);
847        }
848
849        if let Some(handle) = self.response_join_handle.take() {
850            handle.abort();
851            match handle.await {
852                Ok(Ok(_resp)) => {
853                    // Task completed successfully even after abort signal.
854                    // Consider this as successful abortion from client's perspective.
855                    Ok(())
856                }
857                Ok(Err(status)) => {
858                    if status.code() == tonic::Code::Cancelled {
859                        Ok(()) // Expected outcome for cancellation
860                    } else {
861                        Err(storage_utils::map_status_to_os_error(
862                            status,
863                            Some(self.path.to_string()),
864                        ))
865                    }
866                }
867                Err(_join_err) => {
868                    // JoinError after abort is expected if the task panicked or was forcefully terminated.
869                    Ok(())
870                }
871            }
872        } else {
873            // Abort called after already completed or aborted.
874            Ok(())
875        }
876    }
877}