1use crate::error::ErrorData;
2use crate::grpc::storage_utils;
3use crate::grpc::MAX_GRPC_MESSAGE_SIZE;
4use crate::{
5 grpc::storage_service::alien_bindings::storage::{
6 get_response_part,
7 storage_put_multipart_chunk_request::Part as StoragePutMultipartChunkRequestPart,
8 storage_service_client::StorageServiceClient, StorageCopyRequest, StorageDeleteRequest,
9 StorageGetBaseDirRequest, StorageGetUrlRequest, StorageHeadRequest, StorageHttpMethod,
10 StorageListRequest, StorageListWithDelimiterRequest, StoragePutMultipartChunkRequest,
11 StoragePutMultipartMetadata, StoragePutRequest, StoragePutResponse, StorageRenameRequest,
12 StorageSignedUrlRequest,
13 },
14 presigned::{LocalOperation, PresignedOperation, PresignedRequest, PresignedRequestBackend},
15 traits::Binding,
16};
17
18use alien_error::AlienError;
19use alien_error::Context as _;
20use alien_error::IntoAlienError as _;
21use async_stream::try_stream;
22use async_trait::async_trait;
23use bytes::Bytes;
24use chrono::{self, Utc};
25use futures::{stream::BoxStream, StreamExt};
26use object_store::{
27 path::Path, Attributes as OsAttributes, Error as ObjectStoreError, GetOptions,
28 GetRange as OsGetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
29 PutMultipartOpts, PutOptions, PutPayload, PutResult,
30};
31use prost_types;
32use std::fmt::{Debug, Formatter};
33use std::time::Duration;
34use tokio::sync::mpsc;
35use tokio::task::JoinHandle;
36use tokio_stream::wrappers::ReceiverStream;
37use tonic::{transport::Channel, Request, Status};
38use url::Url;
39
40#[derive(Debug)]
41pub struct GrpcStorage {
42 client: StorageServiceClient<Channel>,
43 binding_name: String,
44 base_dir: Path,
45 base_url: Url,
46}
47
48impl GrpcStorage {
49 pub async fn new(binding_name: String, grpc_address: String) -> crate::error::Result<Self> {
50 let channel = crate::providers::grpc_provider::create_grpc_channel(grpc_address).await?;
51 Self::new_from_channel(channel, binding_name).await
52 }
53
54 pub async fn new_from_channel(
55 channel: Channel,
56 binding_name: String,
57 ) -> crate::error::Result<Self> {
58 tracing::debug!(
59 "GrpcStorage::new_from_channel: Creating client for binding: {}",
60 binding_name
61 );
62 let mut client = StorageServiceClient::new(channel.clone())
63 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE);
64
65 tracing::debug!(
66 "GrpcStorage::new_from_channel: Calling get_base_dir for binding: {}",
67 binding_name
68 );
69 let base_dir_req = StorageGetBaseDirRequest {
70 binding_name: binding_name.clone(),
71 };
72 let base_dir_resp = client
73 .get_base_dir(Request::new(base_dir_req))
74 .await
75 .into_alien_error()
76 .context(ErrorData::GrpcRequestFailed {
77 service: "storage".to_string(),
78 method: "get_base_dir".to_string(),
79 details: format!("Failed to get base directory for binding {}", binding_name),
80 })?
81 .into_inner();
82 let base_dir = Path::from(base_dir_resp.path);
83 tracing::debug!("GrpcStorage::new_from_channel: Got base_dir: {}", base_dir);
84
85 tracing::debug!(
86 "GrpcStorage::new_from_channel: Calling get_url for binding: {}",
87 binding_name
88 );
89 let get_url_req = StorageGetUrlRequest {
90 binding_name: binding_name.clone(),
91 };
92 let get_url_resp = client
93 .get_url(Request::new(get_url_req))
94 .await
95 .into_alien_error()
96 .context(ErrorData::GrpcRequestFailed {
97 service: "storage".to_string(),
98 method: "get_url".to_string(),
99 details: format!("Failed to get URL for binding {}", binding_name),
100 })?
101 .into_inner();
102 tracing::debug!(
103 "GrpcStorage::new_from_channel: Got url: {}",
104 get_url_resp.url
105 );
106 let base_url = Url::parse(&get_url_resp.url).into_alien_error().context(
107 ErrorData::BindingConfigInvalid {
108 binding_name: binding_name.clone(),
109 reason: format!("Invalid base_url: {}", get_url_resp.url),
110 },
111 )?;
112
113 Ok(Self {
114 client: StorageServiceClient::new(channel) .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE)
116 .max_encoding_message_size(MAX_GRPC_MESSAGE_SIZE),
117 binding_name,
118 base_dir,
119 base_url,
120 })
121 }
122
123 fn client(&self) -> StorageServiceClient<Channel> {
124 self.client.clone()
125 }
126}
127
128impl std::fmt::Display for GrpcStorage {
129 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
130 write!(
131 f,
132 "GrpcStorage(binding='{}', base_url='{}')",
133 self.binding_name, self.base_url
134 )
135 }
136}
137
138impl Binding for GrpcStorage {}
139
140#[async_trait]
141impl crate::Storage for GrpcStorage {
142 fn get_base_dir(&self) -> Path {
143 self.base_dir.clone()
144 }
145
146 fn get_url(&self) -> Url {
147 self.base_url.clone()
148 }
149
150 async fn presigned_put(
151 &self,
152 path: &Path,
153 expires_in: Duration,
154 ) -> crate::error::Result<PresignedRequest> {
155 let mut client = self.client();
156 let expiration = Utc::now()
157 + chrono::Duration::from_std(expires_in)
158 .into_alien_error()
159 .context(ErrorData::Other {
160 message: "Invalid duration for presigned PUT request".to_string(),
161 })?;
162
163 let request = StorageSignedUrlRequest {
164 binding_name: self.binding_name.clone(),
165 path: path.to_string(),
166 http_method: StorageHttpMethod::HttpMethodPut as i32,
167 expiration_time: Some(prost_types::Timestamp {
168 seconds: expiration.timestamp(),
169 nanos: expiration.timestamp_subsec_nanos() as i32,
170 }),
171 };
172
173 let response = client
174 .signed_url(tonic::Request::new(request))
175 .await
176 .into_alien_error()
177 .context(ErrorData::GrpcRequestFailed {
178 service: "storage".to_string(),
179 method: "signed_url".to_string(),
180 details: "Failed to generate presigned PUT URL".to_string(),
181 })?
182 .into_inner();
183
184 let url = &response.url;
186 let backend = if url.starts_with("http://") || url.starts_with("https://") {
187 let parsed_url = reqwest::Url::parse(url).into_alien_error().context(
189 ErrorData::InvalidConfigurationUrl {
190 url: url.to_string(),
191 reason: "Invalid presigned PUT URL format".to_string(),
192 },
193 )?;
194
195 let mut headers = std::collections::HashMap::new();
197 for (key, value) in parsed_url.query_pairs() {
198 if key.starts_with("X-") || key.starts_with("x-") {
199 headers.insert(key.to_string(), value.to_string());
200 }
201 }
202
203 PresignedRequestBackend::Http {
204 url: url.clone(),
205 method: "PUT".to_string(),
206 headers,
207 }
208 } else if url.starts_with("local://") {
209 let file_path = url.strip_prefix("local://").unwrap_or(url);
211 PresignedRequestBackend::Local {
212 file_path: file_path.to_string(),
213 operation: LocalOperation::Put,
214 }
215 } else {
216 return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
217 url: url.to_string(),
218 reason: "Unsupported presigned URL scheme".to_string(),
219 }));
220 };
221
222 Ok(PresignedRequest {
223 backend,
224 expiration,
225 operation: PresignedOperation::Put,
226 path: path.to_string(),
227 })
228 }
229
230 async fn presigned_get(
231 &self,
232 path: &Path,
233 expires_in: Duration,
234 ) -> crate::error::Result<PresignedRequest> {
235 let mut client = self.client();
236 let expiration = Utc::now()
237 + chrono::Duration::from_std(expires_in)
238 .into_alien_error()
239 .context(ErrorData::Other {
240 message: "Invalid duration for presigned GET request".to_string(),
241 })?;
242
243 let request = StorageSignedUrlRequest {
244 binding_name: self.binding_name.clone(),
245 path: path.to_string(),
246 http_method: StorageHttpMethod::HttpMethodGet as i32,
247 expiration_time: Some(prost_types::Timestamp {
248 seconds: expiration.timestamp(),
249 nanos: expiration.timestamp_subsec_nanos() as i32,
250 }),
251 };
252
253 let response = client
254 .signed_url(tonic::Request::new(request))
255 .await
256 .into_alien_error()
257 .context(ErrorData::GrpcRequestFailed {
258 service: "storage".to_string(),
259 method: "signed_url".to_string(),
260 details: "Failed to generate presigned GET URL".to_string(),
261 })?
262 .into_inner();
263
264 let url = &response.url;
266 let backend = if url.starts_with("http://") || url.starts_with("https://") {
267 let parsed_url = reqwest::Url::parse(url).into_alien_error().context(
269 ErrorData::InvalidConfigurationUrl {
270 url: url.to_string(),
271 reason: "Invalid presigned GET URL format".to_string(),
272 },
273 )?;
274
275 let mut headers = std::collections::HashMap::new();
277 for (key, value) in parsed_url.query_pairs() {
278 if key.starts_with("X-") || key.starts_with("x-") {
279 headers.insert(key.to_string(), value.to_string());
280 }
281 }
282
283 PresignedRequestBackend::Http {
284 url: url.clone(),
285 method: "GET".to_string(),
286 headers,
287 }
288 } else if url.starts_with("local://") {
289 let file_path = url.strip_prefix("local://").unwrap_or(url);
291 PresignedRequestBackend::Local {
292 file_path: file_path.to_string(),
293 operation: LocalOperation::Get,
294 }
295 } else {
296 return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
297 url: url.to_string(),
298 reason: "Unsupported presigned URL scheme".to_string(),
299 }));
300 };
301
302 Ok(PresignedRequest {
303 backend,
304 expiration,
305 operation: PresignedOperation::Get,
306 path: path.to_string(),
307 })
308 }
309
310 async fn presigned_delete(
311 &self,
312 path: &Path,
313 expires_in: Duration,
314 ) -> crate::error::Result<PresignedRequest> {
315 let mut client = self.client();
316 let expiration = Utc::now()
317 + chrono::Duration::from_std(expires_in)
318 .into_alien_error()
319 .context(ErrorData::Other {
320 message: "Invalid duration for presigned DELETE request".to_string(),
321 })?;
322
323 let request = StorageSignedUrlRequest {
324 binding_name: self.binding_name.clone(),
325 path: path.to_string(),
326 http_method: StorageHttpMethod::HttpMethodDelete as i32,
327 expiration_time: Some(prost_types::Timestamp {
328 seconds: expiration.timestamp(),
329 nanos: expiration.timestamp_subsec_nanos() as i32,
330 }),
331 };
332
333 let response = client
334 .signed_url(tonic::Request::new(request))
335 .await
336 .into_alien_error()
337 .context(ErrorData::GrpcRequestFailed {
338 service: "storage".to_string(),
339 method: "signed_url".to_string(),
340 details: "Failed to generate presigned DELETE URL".to_string(),
341 })?
342 .into_inner();
343
344 let url = &response.url;
346 let backend = if url.starts_with("http://") || url.starts_with("https://") {
347 let parsed_url = reqwest::Url::parse(url).into_alien_error().context(
349 ErrorData::InvalidConfigurationUrl {
350 url: url.to_string(),
351 reason: "Invalid presigned DELETE URL format".to_string(),
352 },
353 )?;
354
355 let mut headers = std::collections::HashMap::new();
357 for (key, value) in parsed_url.query_pairs() {
358 if key.starts_with("X-") || key.starts_with("x-") {
359 headers.insert(key.to_string(), value.to_string());
360 }
361 }
362
363 PresignedRequestBackend::Http {
364 url: url.clone(),
365 method: "DELETE".to_string(),
366 headers,
367 }
368 } else if url.starts_with("local://") {
369 let file_path = url.strip_prefix("local://").unwrap_or(url);
371 PresignedRequestBackend::Local {
372 file_path: file_path.to_string(),
373 operation: LocalOperation::Delete,
374 }
375 } else {
376 return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
377 url: url.to_string(),
378 reason: "Unsupported presigned URL scheme".to_string(),
379 }));
380 };
381
382 Ok(PresignedRequest {
383 backend,
384 expiration,
385 operation: PresignedOperation::Delete,
386 path: path.to_string(),
387 })
388 }
389}
390
391#[async_trait]
392impl object_store::ObjectStore for GrpcStorage {
393 async fn put_opts(
394 &self,
395 location: &Path,
396 payload: PutPayload,
397 options: PutOptions,
398 ) -> object_store::Result<PutResult> {
399 let mut client = self.client();
400 let path_str = location.to_string();
401 let data_bytes: Bytes = payload.into();
403
404 let proto_request = StoragePutRequest {
405 binding_name: self.binding_name.clone(),
406 path: path_str.clone(),
407 data: data_bytes.into(), options: storage_utils::map_os_put_options_to_proto(options),
409 };
410
411 let response = client
412 .put(Request::new(proto_request))
413 .await
414 .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?
415 .into_inner();
416
417 Ok(PutResult {
418 e_tag: response.e_tag,
419 version: response.version,
420 })
421 }
422
423 async fn put_multipart_opts(
424 &self,
425 location: &Path,
426 opts: PutMultipartOpts,
427 ) -> object_store::Result<Box<dyn MultipartUpload>> {
428 let mut client = self.client();
429 let path_str = location.to_string();
430
431 let metadata_proto = StoragePutMultipartMetadata {
432 binding_name: self.binding_name.clone(),
433 path: path_str.clone(),
434 options: storage_utils::map_os_put_multipart_opts_to_proto(opts),
435 };
436
437 let initial_request_part = StoragePutMultipartChunkRequestPart::Metadata(metadata_proto);
438 let initial_request = StoragePutMultipartChunkRequest {
439 part: Some(initial_request_part),
440 };
441
442 let (tx, rx) = mpsc::channel::<object_store::Result<StoragePutMultipartChunkRequest>>(4);
443
444 tx.send(Ok(initial_request))
445 .await
446 .map_err(|_e| ObjectStoreError::Generic {
447 store: "GrpcClient::put_multipart_opts",
448 source: "Failed to send initial metadata for multipart upload".into(),
449 })?;
450
451 let request_stream = ReceiverStream::new(rx).map(|result_item| {
455 match result_item {
456 Ok(req) => req, Err(e) => { panic!("Error received in put_multipart request stream from mpsc: {:?}. This should not happen.", e);
469 }
470 }
471 });
472
473 let response_join_handle: JoinHandle<Result<tonic::Response<StoragePutResponse>, Status>> =
474 tokio::spawn(async move { client.put_multipart(Request::new(request_stream)).await });
475
476 Ok(Box::new(GrpcMultipartUpload {
477 path: location.clone(),
478 client_stream_sender: Some(tx),
479 response_join_handle: Some(response_join_handle),
480 }))
481 }
482
483 async fn get_opts(
484 &self,
485 location: &Path,
486 options: GetOptions,
487 ) -> object_store::Result<GetResult> {
488 let mut client = self.client();
489 let path_str = location.to_string();
490
491 let request_options = options.clone(); let proto_request = storage_utils::map_os_get_options_to_proto_request(
493 options,
494 self.binding_name.clone(),
495 path_str.clone(),
496 );
497
498 let mut grpc_stream = client
499 .get(Request::new(proto_request))
500 .await
501 .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str.clone())))?
502 .into_inner();
503
504 let first_part_res = grpc_stream.next().await;
505 let first_part = match first_part_res {
506 Some(Ok(part)) => part,
507 Some(Err(status)) => {
508 return Err(storage_utils::map_status_to_os_error(
509 status,
510 Some(path_str.clone()),
511 ))
512 }
513 None => {
514 return Err(ObjectStoreError::Generic {
515 store: "gRPC",
516 source: "GetResponsePart stream was empty, expected metadata".into(),
517 })
518 }
519 };
520
521 let proto_meta = match first_part.part {
522 Some(get_response_part::Part::Metadata(meta)) => meta,
523 _ => {
524 return Err(ObjectStoreError::Generic {
525 store: "gRPC",
526 source: "First message in GetResponsePart stream was not Metadata".into(),
527 })
528 }
529 };
530 let object_meta = storage_utils::map_proto_object_meta_to_os(proto_meta)?;
531
532 let (tx, rx) = mpsc::channel::<object_store::Result<Bytes>>(4);
533 let error_path_clone = path_str.clone();
534
535 tokio::spawn(async move {
536 while let Some(stream_item_result) = grpc_stream.next().await {
537 match stream_item_result {
538 Ok(response_part) => match response_part.part {
539 Some(get_response_part::Part::ChunkData(data)) => {
540 if tx.send(Ok(data.into())).await.is_err() {
541 break;
542 }
543 }
544 Some(get_response_part::Part::Metadata(_)) => {
545 let _ = tx
546 .send(Err(ObjectStoreError::Generic {
547 store: "gRPC",
548 source: "Received metadata again in GetResponsePart stream"
549 .into(),
550 }))
551 .await;
552 break;
553 }
554 None => {
555 let _ = tx
556 .send(Err(ObjectStoreError::Generic {
557 store: "gRPC",
558 source: "Empty part in GetResponsePart stream".into(),
559 }))
560 .await;
561 break;
562 }
563 },
564 Err(status) => {
565 let _ = tx
566 .send(Err(storage_utils::map_status_to_os_error(
567 status,
568 Some(error_path_clone.clone()),
569 )))
570 .await;
571 break;
572 }
573 }
574 }
575 });
576
577 let calculated_range = request_options
578 .range
579 .map_or(0..object_meta.size, |r| match r {
580 OsGetRange::Bounded(br) => br.start..br.end,
581 OsGetRange::Offset(o) => std::cmp::min(o, object_meta.size)..object_meta.size,
582 OsGetRange::Suffix(s) => object_meta.size.saturating_sub(s)..object_meta.size,
583 });
584
585 Ok(GetResult {
586 payload: GetResultPayload::Stream(ReceiverStream::new(rx).boxed()),
587 meta: object_meta,
588 range: calculated_range,
589 attributes: OsAttributes::default(), })
591 }
592
593 async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
594 let mut client = self.client();
595 let path_str = location.to_string();
596
597 let proto_request = StorageHeadRequest {
598 binding_name: self.binding_name.clone(),
599 path: path_str.clone(),
600 };
601
602 let response = client
603 .head(Request::new(proto_request))
604 .await
605 .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?
606 .into_inner();
607 storage_utils::map_proto_object_meta_to_os(response)
608 }
609
610 async fn delete(&self, location: &Path) -> object_store::Result<()> {
611 let mut client = self.client();
612 let path_str = location.to_string();
613 let proto_request = StorageDeleteRequest {
614 binding_name: self.binding_name.clone(),
615 path: path_str.clone(),
616 };
617 client
618 .delete(Request::new(proto_request))
619 .await
620 .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?;
621 Ok(())
622 }
623
624 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
625 let mut client = self.client();
626 let binding_name = self.binding_name.clone();
627 let prefix_path_str = prefix.map(|p| p.to_string());
628
629 try_stream! { let proto_request = StorageListRequest {
631 binding_name: binding_name.clone(),
632 prefix: prefix_path_str.clone(),
633 offset: None,
634 };
635
636 let mut stream = client.list(Request::new(proto_request)).await
637 .map_err(|s| storage_utils::map_status_to_os_error(s, prefix_path_str.clone()))?
638 .into_inner();
639
640 while let Some(item_result) = stream.next().await {
641 match item_result {
642 Ok(proto_meta) => {
643 yield storage_utils::map_proto_object_meta_to_os(proto_meta)?;
644 }
645 Err(status) => {
646 Err(storage_utils::map_status_to_os_error(status, prefix_path_str.clone()))?;
647 }
648 }
649 }
650 }
651 .boxed()
652 }
653
654 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
655 let mut client = self.client();
656 let path_str = prefix.map(|p| p.to_string());
657 let proto_request = StorageListWithDelimiterRequest {
658 binding_name: self.binding_name.clone(),
659 prefix: path_str.clone(),
660 };
661 let response = client
662 .list_with_delimiter(Request::new(proto_request))
663 .await
664 .map_err(|s| storage_utils::map_status_to_os_error(s, path_str))?
665 .into_inner();
666 let common_prefixes = response
667 .common_prefixes
668 .into_iter()
669 .map(Path::from)
670 .collect();
671 let objects = response
672 .objects
673 .into_iter()
674 .map(storage_utils::map_proto_object_meta_to_os)
675 .collect::<Result<Vec<_>, _>>()?;
676 Ok(ListResult {
677 common_prefixes,
678 objects,
679 })
680 }
681
682 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
683 let mut client = self.client();
684 let from_str = from.to_string();
685 let to_str = to.to_string();
686 let proto_request = StorageCopyRequest {
687 binding_name: self.binding_name.clone(),
688 from_path: from_str.clone(),
689 to_path: to_str.clone(),
690 };
691 client
692 .copy(Request::new(proto_request))
693 .await
694 .map_err(|s| {
695 storage_utils::map_status_to_os_error(
696 s,
697 Some(format!("copy from {} to {}", from_str, to_str)),
698 )
699 })?;
700 Ok(())
701 }
702
703 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
704 let mut client = self.client();
705 let from_str = from.to_string();
706 let to_str = to.to_string();
707 let proto_request = StorageRenameRequest {
708 binding_name: self.binding_name.clone(),
709 from_path: from_str.clone(),
710 to_path: to_str.clone(),
711 };
712 client
713 .rename(Request::new(proto_request))
714 .await
715 .map_err(|s| {
716 storage_utils::map_status_to_os_error(
717 s,
718 Some(format!("rename from {} to {}", from_str, to_str)),
719 )
720 })?;
721 Ok(())
722 }
723
724 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
725 let mut client = self.client();
726 let from_str = from.to_string();
727 let to_str = to.to_string();
728 let proto_request = StorageCopyRequest {
729 binding_name: self.binding_name.clone(),
730 from_path: from_str.clone(),
731 to_path: to_str.clone(),
732 };
733 client
734 .copy_if_not_exists(Request::new(proto_request))
735 .await
736 .map_err(|s| {
737 storage_utils::map_status_to_os_error(
738 s,
739 Some(format!(
740 "copy_if_not_exists from {} to {}",
741 from_str, to_str
742 )),
743 )
744 })?;
745 Ok(())
746 }
747
748 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
749 let mut client = self.client();
750 let from_str = from.to_string();
751 let to_str = to.to_string();
752 let proto_request = StorageRenameRequest {
753 binding_name: self.binding_name.clone(),
754 from_path: from_str.clone(),
755 to_path: to_str.clone(),
756 };
757 client
758 .rename_if_not_exists(Request::new(proto_request))
759 .await
760 .map_err(|s| {
761 storage_utils::map_status_to_os_error(
762 s,
763 Some(format!(
764 "rename_if_not_exists from {} to {}",
765 from_str, to_str
766 )),
767 )
768 })?;
769 Ok(())
770 }
771}
772
773#[derive(Debug)]
774struct GrpcMultipartUpload {
775 path: Path, client_stream_sender:
777 Option<mpsc::Sender<object_store::Result<StoragePutMultipartChunkRequest>>>,
778 response_join_handle: Option<JoinHandle<Result<tonic::Response<StoragePutResponse>, Status>>>,
779}
780
781#[async_trait]
782impl MultipartUpload for GrpcMultipartUpload {
783 fn put_part(&mut self, data: PutPayload) -> object_store::UploadPart {
784 let sender_clone = match self.client_stream_sender.as_ref() {
785 Some(s) => s.clone(),
786 None => {
787 return Box::pin(async {
788 Err(ObjectStoreError::Generic {
789 store: "GrpcMultipartUpload::put_part",
790 source: "Sender unavailable; put_part called after complete/abort or on failed init.".into(),
791 })
792 });
793 }
794 };
795
796 Box::pin(async move {
797 let bytes_data: Bytes = data.into(); let chunk_data_part = StoragePutMultipartChunkRequestPart::ChunkData(bytes_data.into());
799 let request = StoragePutMultipartChunkRequest {
800 part: Some(chunk_data_part),
801 };
802
803 sender_clone
804 .send(Ok(request))
805 .await
806 .map_err(|e| ObjectStoreError::Generic {
807 store: "GrpcMultipartUpload::put_part",
808 source: format!("Failed to send part, gRPC call might have failed: {}", e)
809 .into(),
810 })
811 })
812 }
813
814 async fn complete(&mut self) -> object_store::Result<PutResult> {
815 if let Some(sender) = self.client_stream_sender.take() {
816 drop(sender);
817 }
818
819 let handle = self
820 .response_join_handle
821 .take()
822 .ok_or_else(|| ObjectStoreError::Generic {
823 store: "GrpcMultipartUpload::complete",
824 source: "complete called more than once or on an already aborted/failed upload"
825 .into(),
826 })?;
827
828 match handle.await {
829 Ok(Ok(response)) => {
830 let put_response = response.into_inner();
831 Ok(PutResult {
832 e_tag: put_response.e_tag,
833 version: put_response.version,
834 })
835 }
836 Ok(Err(status)) => Err(storage_utils::map_status_to_os_error(
837 status,
838 Some(self.path.to_string()),
839 )),
840 Err(join_err) => Err(ObjectStoreError::from(join_err)),
841 }
842 }
843
844 async fn abort(&mut self) -> object_store::Result<()> {
845 if let Some(sender) = self.client_stream_sender.take() {
846 drop(sender);
847 }
848
849 if let Some(handle) = self.response_join_handle.take() {
850 handle.abort();
851 match handle.await {
852 Ok(Ok(_resp)) => {
853 Ok(())
856 }
857 Ok(Err(status)) => {
858 if status.code() == tonic::Code::Cancelled {
859 Ok(()) } else {
861 Err(storage_utils::map_status_to_os_error(
862 status,
863 Some(self.path.to_string()),
864 ))
865 }
866 }
867 Err(_join_err) => {
868 Ok(())
870 }
871 }
872 } else {
873 Ok(())
875 }
876 }
877}