1use crate::error::ErrorData;
2use crate::grpc::storage_utils;
3use crate::grpc::MAX_GRPC_MESSAGE_SIZE;
4use crate::{
5 grpc::storage_service::alien_bindings::storage::{
6 get_response_part,
7 storage_put_multipart_chunk_request::Part as StoragePutMultipartChunkRequestPart,
8 storage_service_client::StorageServiceClient, StorageCopyRequest, StorageDeleteRequest,
9 StorageGetBaseDirRequest, StorageGetUrlRequest, StorageHeadRequest, StorageHttpMethod,
10 StorageListRequest, StorageListWithDelimiterRequest, StoragePutMultipartChunkRequest,
11 StoragePutMultipartMetadata, StoragePutRequest, StoragePutResponse, StorageRenameRequest,
12 StorageSignedUrlRequest,
13 },
14 presigned::{LocalOperation, PresignedOperation, PresignedRequest, PresignedRequestBackend},
15 traits::Binding,
16};
17
18use alien_error::AlienError;
19use alien_error::Context as _;
20use alien_error::IntoAlienError as _;
21use async_stream::try_stream;
22use async_trait::async_trait;
23use bytes::Bytes;
24use chrono::{self, Utc};
25use futures::{stream::BoxStream, StreamExt};
26use object_store::{
27 path::Path, Attributes as OsAttributes, Error as ObjectStoreError, GetOptions,
28 GetRange as OsGetRange, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
29 PutMultipartOptions, PutOptions, PutPayload, PutResult,
30};
31use prost_types;
32use std::time::Duration;
33use std::{
34 collections::HashMap,
35 fmt::{Debug, Formatter},
36};
37use tokio::sync::mpsc;
38use tokio::task::JoinHandle;
39use tokio_stream::wrappers::ReceiverStream;
40use tonic::{transport::Channel, Request, Status};
41use url::Url;
42
43#[derive(Debug)]
44pub struct GrpcStorage {
45 client: StorageServiceClient<Channel>,
46 binding_name: String,
47 base_dir: Path,
48 base_url: Url,
49}
50
51impl GrpcStorage {
52 pub async fn new(binding_name: String, grpc_address: String) -> crate::error::Result<Self> {
53 let channel = crate::providers::grpc_provider::create_grpc_channel(grpc_address).await?;
54 Self::new_from_channel(channel, binding_name).await
55 }
56
57 pub async fn new_from_channel(
58 channel: Channel,
59 binding_name: String,
60 ) -> crate::error::Result<Self> {
61 tracing::debug!(
62 "GrpcStorage::new_from_channel: Creating client for binding: {}",
63 binding_name
64 );
65 let mut client = StorageServiceClient::new(channel.clone())
66 .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE);
67
68 tracing::debug!(
69 "GrpcStorage::new_from_channel: Calling get_base_dir for binding: {}",
70 binding_name
71 );
72 let base_dir_req = StorageGetBaseDirRequest {
73 binding_name: binding_name.clone(),
74 };
75 let base_dir_resp = client
76 .get_base_dir(Request::new(base_dir_req))
77 .await
78 .into_alien_error()
79 .context(ErrorData::GrpcRequestFailed {
80 service: "storage".to_string(),
81 method: "get_base_dir".to_string(),
82 details: format!("Failed to get base directory for binding {}", binding_name),
83 })?
84 .into_inner();
85 let base_dir = Path::from(base_dir_resp.path);
86 tracing::debug!("GrpcStorage::new_from_channel: Got base_dir: {}", base_dir);
87
88 tracing::debug!(
89 "GrpcStorage::new_from_channel: Calling get_url for binding: {}",
90 binding_name
91 );
92 let get_url_req = StorageGetUrlRequest {
93 binding_name: binding_name.clone(),
94 };
95 let get_url_resp = client
96 .get_url(Request::new(get_url_req))
97 .await
98 .into_alien_error()
99 .context(ErrorData::GrpcRequestFailed {
100 service: "storage".to_string(),
101 method: "get_url".to_string(),
102 details: format!("Failed to get URL for binding {}", binding_name),
103 })?
104 .into_inner();
105 tracing::debug!(
106 "GrpcStorage::new_from_channel: Got url: {}",
107 get_url_resp.url
108 );
109 let base_url = Url::parse(&get_url_resp.url).into_alien_error().context(
110 ErrorData::BindingConfigInvalid {
111 binding_name: binding_name.clone(),
112 reason: format!("Invalid base_url: {}", get_url_resp.url),
113 },
114 )?;
115
116 Ok(Self {
117 client: StorageServiceClient::new(channel) .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE)
119 .max_encoding_message_size(MAX_GRPC_MESSAGE_SIZE),
120 binding_name,
121 base_dir,
122 base_url,
123 })
124 }
125
126 fn client(&self) -> StorageServiceClient<Channel> {
127 self.client.clone()
128 }
129
130 fn http_presigned_backend(
131 url: &str,
132 method: &str,
133 headers: HashMap<String, String>,
134 invalid_url_reason: &str,
135 ) -> crate::error::Result<PresignedRequestBackend> {
136 Url::parse(url)
137 .into_alien_error()
138 .context(ErrorData::InvalidConfigurationUrl {
139 url: url.to_string(),
140 reason: invalid_url_reason.to_string(),
141 })?;
142
143 Ok(PresignedRequestBackend::Http {
144 url: url.to_string(),
145 method: method.to_string(),
146 headers,
147 })
148 }
149}
150
151impl std::fmt::Display for GrpcStorage {
152 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
153 write!(
154 f,
155 "GrpcStorage(binding='{}', base_url='{}')",
156 self.binding_name, self.base_url
157 )
158 }
159}
160
161impl Binding for GrpcStorage {}
162
163#[async_trait]
164impl crate::Storage for GrpcStorage {
165 fn get_base_dir(&self) -> Path {
166 self.base_dir.clone()
167 }
168
169 fn get_url(&self) -> Url {
170 self.base_url.clone()
171 }
172
173 async fn presigned_put(
174 &self,
175 path: &Path,
176 expires_in: Duration,
177 ) -> crate::error::Result<PresignedRequest> {
178 let mut client = self.client();
179 let expiration = Utc::now()
180 + chrono::Duration::from_std(expires_in)
181 .into_alien_error()
182 .context(ErrorData::Other {
183 message: "Invalid duration for presigned PUT request".to_string(),
184 })?;
185
186 let request = StorageSignedUrlRequest {
187 binding_name: self.binding_name.clone(),
188 path: path.to_string(),
189 http_method: StorageHttpMethod::HttpMethodPut as i32,
190 expiration_time: Some(prost_types::Timestamp {
191 seconds: expiration.timestamp(),
192 nanos: expiration.timestamp_subsec_nanos() as i32,
193 }),
194 };
195
196 let response = client
197 .signed_url(tonic::Request::new(request))
198 .await
199 .into_alien_error()
200 .context(ErrorData::GrpcRequestFailed {
201 service: "storage".to_string(),
202 method: "signed_url".to_string(),
203 details: "Failed to generate presigned PUT URL".to_string(),
204 })?
205 .into_inner();
206
207 let headers = response
209 .headers
210 .into_iter()
211 .map(|header| (header.key, header.value))
212 .collect();
213 let url = &response.url;
214 let backend = if url.starts_with("http://") || url.starts_with("https://") {
215 Self::http_presigned_backend(url, "PUT", headers, "Invalid presigned PUT URL format")?
216 } else if url.starts_with("local://") {
217 let file_path = url.strip_prefix("local://").unwrap_or(url);
219 PresignedRequestBackend::Local {
220 file_path: file_path.to_string(),
221 operation: LocalOperation::Put,
222 }
223 } else {
224 return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
225 url: url.to_string(),
226 reason: "Unsupported presigned URL scheme".to_string(),
227 }));
228 };
229
230 Ok(PresignedRequest {
231 backend,
232 expiration,
233 operation: PresignedOperation::Put,
234 path: path.to_string(),
235 })
236 }
237
238 async fn presigned_get(
239 &self,
240 path: &Path,
241 expires_in: Duration,
242 ) -> crate::error::Result<PresignedRequest> {
243 let mut client = self.client();
244 let expiration = Utc::now()
245 + chrono::Duration::from_std(expires_in)
246 .into_alien_error()
247 .context(ErrorData::Other {
248 message: "Invalid duration for presigned GET request".to_string(),
249 })?;
250
251 let request = StorageSignedUrlRequest {
252 binding_name: self.binding_name.clone(),
253 path: path.to_string(),
254 http_method: StorageHttpMethod::HttpMethodGet as i32,
255 expiration_time: Some(prost_types::Timestamp {
256 seconds: expiration.timestamp(),
257 nanos: expiration.timestamp_subsec_nanos() as i32,
258 }),
259 };
260
261 let response = client
262 .signed_url(tonic::Request::new(request))
263 .await
264 .into_alien_error()
265 .context(ErrorData::GrpcRequestFailed {
266 service: "storage".to_string(),
267 method: "signed_url".to_string(),
268 details: "Failed to generate presigned GET URL".to_string(),
269 })?
270 .into_inner();
271
272 let headers = response
274 .headers
275 .into_iter()
276 .map(|header| (header.key, header.value))
277 .collect();
278 let url = &response.url;
279 let backend = if url.starts_with("http://") || url.starts_with("https://") {
280 Self::http_presigned_backend(url, "GET", headers, "Invalid presigned GET URL format")?
281 } else if url.starts_with("local://") {
282 let file_path = url.strip_prefix("local://").unwrap_or(url);
284 PresignedRequestBackend::Local {
285 file_path: file_path.to_string(),
286 operation: LocalOperation::Get,
287 }
288 } else {
289 return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
290 url: url.to_string(),
291 reason: "Unsupported presigned URL scheme".to_string(),
292 }));
293 };
294
295 Ok(PresignedRequest {
296 backend,
297 expiration,
298 operation: PresignedOperation::Get,
299 path: path.to_string(),
300 })
301 }
302
303 async fn presigned_delete(
304 &self,
305 path: &Path,
306 expires_in: Duration,
307 ) -> crate::error::Result<PresignedRequest> {
308 let mut client = self.client();
309 let expiration = Utc::now()
310 + chrono::Duration::from_std(expires_in)
311 .into_alien_error()
312 .context(ErrorData::Other {
313 message: "Invalid duration for presigned DELETE request".to_string(),
314 })?;
315
316 let request = StorageSignedUrlRequest {
317 binding_name: self.binding_name.clone(),
318 path: path.to_string(),
319 http_method: StorageHttpMethod::HttpMethodDelete as i32,
320 expiration_time: Some(prost_types::Timestamp {
321 seconds: expiration.timestamp(),
322 nanos: expiration.timestamp_subsec_nanos() as i32,
323 }),
324 };
325
326 let response = client
327 .signed_url(tonic::Request::new(request))
328 .await
329 .into_alien_error()
330 .context(ErrorData::GrpcRequestFailed {
331 service: "storage".to_string(),
332 method: "signed_url".to_string(),
333 details: "Failed to generate presigned DELETE URL".to_string(),
334 })?
335 .into_inner();
336
337 let headers = response
339 .headers
340 .into_iter()
341 .map(|header| (header.key, header.value))
342 .collect();
343 let url = &response.url;
344 let backend = if url.starts_with("http://") || url.starts_with("https://") {
345 Self::http_presigned_backend(
346 url,
347 "DELETE",
348 headers,
349 "Invalid presigned DELETE URL format",
350 )?
351 } else if url.starts_with("local://") {
352 let file_path = url.strip_prefix("local://").unwrap_or(url);
354 PresignedRequestBackend::Local {
355 file_path: file_path.to_string(),
356 operation: LocalOperation::Delete,
357 }
358 } else {
359 return Err(AlienError::new(ErrorData::InvalidConfigurationUrl {
360 url: url.to_string(),
361 reason: "Unsupported presigned URL scheme".to_string(),
362 }));
363 };
364
365 Ok(PresignedRequest {
366 backend,
367 expiration,
368 operation: PresignedOperation::Delete,
369 path: path.to_string(),
370 })
371 }
372}
373
374#[async_trait]
375impl object_store::ObjectStore for GrpcStorage {
376 async fn put_opts(
377 &self,
378 location: &Path,
379 payload: PutPayload,
380 options: PutOptions,
381 ) -> object_store::Result<PutResult> {
382 let mut client = self.client();
383 let path_str = location.to_string();
384 let data_bytes: Bytes = payload.into();
386
387 let proto_request = StoragePutRequest {
388 binding_name: self.binding_name.clone(),
389 path: path_str.clone(),
390 data: data_bytes.into(), options: storage_utils::map_os_put_options_to_proto(options),
392 };
393
394 let response = client
395 .put(Request::new(proto_request))
396 .await
397 .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?
398 .into_inner();
399
400 Ok(PutResult {
401 e_tag: response.e_tag,
402 version: response.version,
403 })
404 }
405
406 async fn put_multipart_opts(
407 &self,
408 location: &Path,
409 opts: PutMultipartOptions,
410 ) -> object_store::Result<Box<dyn MultipartUpload>> {
411 let mut client = self.client();
412 let path_str = location.to_string();
413
414 let metadata_proto = StoragePutMultipartMetadata {
415 binding_name: self.binding_name.clone(),
416 path: path_str.clone(),
417 options: storage_utils::map_os_put_multipart_opts_to_proto(opts),
418 };
419
420 let initial_request_part = StoragePutMultipartChunkRequestPart::Metadata(metadata_proto);
421 let initial_request = StoragePutMultipartChunkRequest {
422 part: Some(initial_request_part),
423 };
424
425 let (tx, rx) = mpsc::channel::<object_store::Result<StoragePutMultipartChunkRequest>>(4);
426
427 tx.send(Ok(initial_request))
428 .await
429 .map_err(|_e| ObjectStoreError::Generic {
430 store: "GrpcClient::put_multipart_opts",
431 source: "Failed to send initial metadata for multipart upload".into(),
432 })?;
433
434 let request_stream = ReceiverStream::new(rx).map(|result_item| {
438 match result_item {
439 Ok(req) => req, Err(e) => { panic!("Error received in put_multipart request stream from mpsc: {:?}. This should not happen.", e);
452 }
453 }
454 });
455
456 let response_join_handle: JoinHandle<Result<tonic::Response<StoragePutResponse>, Status>> =
457 tokio::spawn(async move { client.put_multipart(Request::new(request_stream)).await });
458
459 Ok(Box::new(GrpcMultipartUpload {
460 path: location.clone(),
461 client_stream_sender: Some(tx),
462 response_join_handle: Some(response_join_handle),
463 }))
464 }
465
466 async fn get_opts(
467 &self,
468 location: &Path,
469 options: GetOptions,
470 ) -> object_store::Result<GetResult> {
471 let mut client = self.client();
472 let path_str = location.to_string();
473
474 let request_options = options.clone(); let proto_request = storage_utils::map_os_get_options_to_proto_request(
476 options,
477 self.binding_name.clone(),
478 path_str.clone(),
479 );
480
481 let mut grpc_stream = client
482 .get(Request::new(proto_request))
483 .await
484 .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str.clone())))?
485 .into_inner();
486
487 let first_part_res = grpc_stream.next().await;
488 let first_part = match first_part_res {
489 Some(Ok(part)) => part,
490 Some(Err(status)) => {
491 return Err(storage_utils::map_status_to_os_error(
492 status,
493 Some(path_str.clone()),
494 ))
495 }
496 None => {
497 return Err(ObjectStoreError::Generic {
498 store: "gRPC",
499 source: "GetResponsePart stream was empty, expected metadata".into(),
500 })
501 }
502 };
503
504 let proto_meta = match first_part.part {
505 Some(get_response_part::Part::Metadata(meta)) => meta,
506 _ => {
507 return Err(ObjectStoreError::Generic {
508 store: "gRPC",
509 source: "First message in GetResponsePart stream was not Metadata".into(),
510 })
511 }
512 };
513 let object_meta = storage_utils::map_proto_object_meta_to_os(proto_meta)?;
514
515 let (tx, rx) = mpsc::channel::<object_store::Result<Bytes>>(4);
516 let error_path_clone = path_str.clone();
517
518 tokio::spawn(async move {
519 while let Some(stream_item_result) = grpc_stream.next().await {
520 match stream_item_result {
521 Ok(response_part) => match response_part.part {
522 Some(get_response_part::Part::ChunkData(data)) => {
523 if tx.send(Ok(data.into())).await.is_err() {
524 break;
525 }
526 }
527 Some(get_response_part::Part::Metadata(_)) => {
528 let _ = tx
529 .send(Err(ObjectStoreError::Generic {
530 store: "gRPC",
531 source: "Received metadata again in GetResponsePart stream"
532 .into(),
533 }))
534 .await;
535 break;
536 }
537 None => {
538 let _ = tx
539 .send(Err(ObjectStoreError::Generic {
540 store: "gRPC",
541 source: "Empty part in GetResponsePart stream".into(),
542 }))
543 .await;
544 break;
545 }
546 },
547 Err(status) => {
548 let _ = tx
549 .send(Err(storage_utils::map_status_to_os_error(
550 status,
551 Some(error_path_clone.clone()),
552 )))
553 .await;
554 break;
555 }
556 }
557 }
558 });
559
560 let calculated_range = request_options
561 .range
562 .map_or(0..object_meta.size, |r| match r {
563 OsGetRange::Bounded(br) => br.start..br.end,
564 OsGetRange::Offset(o) => std::cmp::min(o, object_meta.size)..object_meta.size,
565 OsGetRange::Suffix(s) => object_meta.size.saturating_sub(s)..object_meta.size,
566 });
567
568 Ok(GetResult {
569 payload: GetResultPayload::Stream(ReceiverStream::new(rx).boxed()),
570 meta: object_meta,
571 range: calculated_range,
572 attributes: OsAttributes::default(), })
574 }
575
576 async fn head(&self, location: &Path) -> object_store::Result<ObjectMeta> {
577 let mut client = self.client();
578 let path_str = location.to_string();
579
580 let proto_request = StorageHeadRequest {
581 binding_name: self.binding_name.clone(),
582 path: path_str.clone(),
583 };
584
585 let response = client
586 .head(Request::new(proto_request))
587 .await
588 .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?
589 .into_inner();
590 storage_utils::map_proto_object_meta_to_os(response)
591 }
592
593 async fn delete(&self, location: &Path) -> object_store::Result<()> {
594 let mut client = self.client();
595 let path_str = location.to_string();
596 let proto_request = StorageDeleteRequest {
597 binding_name: self.binding_name.clone(),
598 path: path_str.clone(),
599 };
600 client
601 .delete(Request::new(proto_request))
602 .await
603 .map_err(|s| storage_utils::map_status_to_os_error(s, Some(path_str)))?;
604 Ok(())
605 }
606
607 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
608 let mut client = self.client();
609 let binding_name = self.binding_name.clone();
610 let prefix_path_str = prefix.map(|p| p.to_string());
611
612 try_stream! { let proto_request = StorageListRequest {
614 binding_name: binding_name.clone(),
615 prefix: prefix_path_str.clone(),
616 offset: None,
617 };
618
619 let mut stream = client.list(Request::new(proto_request)).await
620 .map_err(|s| storage_utils::map_status_to_os_error(s, prefix_path_str.clone()))?
621 .into_inner();
622
623 while let Some(item_result) = stream.next().await {
624 match item_result {
625 Ok(proto_meta) => {
626 yield storage_utils::map_proto_object_meta_to_os(proto_meta)?;
627 }
628 Err(status) => {
629 Err(storage_utils::map_status_to_os_error(status, prefix_path_str.clone()))?;
630 }
631 }
632 }
633 }
634 .boxed()
635 }
636
637 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
638 let mut client = self.client();
639 let path_str = prefix.map(|p| p.to_string());
640 let proto_request = StorageListWithDelimiterRequest {
641 binding_name: self.binding_name.clone(),
642 prefix: path_str.clone(),
643 };
644 let response = client
645 .list_with_delimiter(Request::new(proto_request))
646 .await
647 .map_err(|s| storage_utils::map_status_to_os_error(s, path_str))?
648 .into_inner();
649 let common_prefixes = response
650 .common_prefixes
651 .into_iter()
652 .map(Path::from)
653 .collect();
654 let objects = response
655 .objects
656 .into_iter()
657 .map(storage_utils::map_proto_object_meta_to_os)
658 .collect::<Result<Vec<_>, _>>()?;
659 Ok(ListResult {
660 common_prefixes,
661 objects,
662 })
663 }
664
665 async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
666 let mut client = self.client();
667 let from_str = from.to_string();
668 let to_str = to.to_string();
669 let proto_request = StorageCopyRequest {
670 binding_name: self.binding_name.clone(),
671 from_path: from_str.clone(),
672 to_path: to_str.clone(),
673 };
674 client
675 .copy(Request::new(proto_request))
676 .await
677 .map_err(|s| {
678 storage_utils::map_status_to_os_error(
679 s,
680 Some(format!("copy from {} to {}", from_str, to_str)),
681 )
682 })?;
683 Ok(())
684 }
685
686 async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
687 let mut client = self.client();
688 let from_str = from.to_string();
689 let to_str = to.to_string();
690 let proto_request = StorageRenameRequest {
691 binding_name: self.binding_name.clone(),
692 from_path: from_str.clone(),
693 to_path: to_str.clone(),
694 };
695 client
696 .rename(Request::new(proto_request))
697 .await
698 .map_err(|s| {
699 storage_utils::map_status_to_os_error(
700 s,
701 Some(format!("rename from {} to {}", from_str, to_str)),
702 )
703 })?;
704 Ok(())
705 }
706
707 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
708 let mut client = self.client();
709 let from_str = from.to_string();
710 let to_str = to.to_string();
711 let proto_request = StorageCopyRequest {
712 binding_name: self.binding_name.clone(),
713 from_path: from_str.clone(),
714 to_path: to_str.clone(),
715 };
716 client
717 .copy_if_not_exists(Request::new(proto_request))
718 .await
719 .map_err(|s| {
720 storage_utils::map_status_to_os_error(
721 s,
722 Some(format!(
723 "copy_if_not_exists from {} to {}",
724 from_str, to_str
725 )),
726 )
727 })?;
728 Ok(())
729 }
730
731 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
732 let mut client = self.client();
733 let from_str = from.to_string();
734 let to_str = to.to_string();
735 let proto_request = StorageRenameRequest {
736 binding_name: self.binding_name.clone(),
737 from_path: from_str.clone(),
738 to_path: to_str.clone(),
739 };
740 client
741 .rename_if_not_exists(Request::new(proto_request))
742 .await
743 .map_err(|s| {
744 storage_utils::map_status_to_os_error(
745 s,
746 Some(format!(
747 "rename_if_not_exists from {} to {}",
748 from_str, to_str
749 )),
750 )
751 })?;
752 Ok(())
753 }
754}
755
756#[cfg(test)]
757mod tests {
758 use super::*;
759
760 #[test]
761 fn http_presigned_backend_keeps_query_auth_out_of_headers() {
762 let signed_url = "https://bucket.s3.us-east-2.amazonaws.com/key\
763 ?X-Amz-Algorithm=AWS4-HMAC-SHA256\
764 &X-Amz-Credential=AKIA%2F20260527%2Fus-east-2%2Fs3%2Faws4_request\
765 &X-Amz-Date=20260527T173759Z\
766 &X-Amz-Expires=300\
767 &X-Amz-SignedHeaders=host\
768 &X-Amz-Signature=abc123";
769
770 let backend = GrpcStorage::http_presigned_backend(
771 signed_url,
772 "PUT",
773 HashMap::new(),
774 "Invalid presigned PUT URL format",
775 )
776 .expect("valid signed URL");
777
778 match backend {
779 PresignedRequestBackend::Http {
780 url,
781 method,
782 headers,
783 } => {
784 assert_eq!(url, signed_url);
785 assert_eq!(method, "PUT");
786 assert!(headers.is_empty());
787 }
788 PresignedRequestBackend::Local { .. } => panic!("expected HTTP backend"),
789 }
790 }
791
792 #[test]
793 fn http_presigned_backend_preserves_provider_headers() {
794 let signed_url = "https://account.blob.core.windows.net/container/key?sas=token";
795 let mut headers = HashMap::new();
796 headers.insert("x-ms-blob-type".to_string(), "BlockBlob".to_string());
797
798 let backend = GrpcStorage::http_presigned_backend(
799 signed_url,
800 "PUT",
801 headers,
802 "Invalid presigned PUT URL format",
803 )
804 .expect("valid signed URL");
805
806 match backend {
807 PresignedRequestBackend::Http { headers, .. } => {
808 assert_eq!(
809 headers.get("x-ms-blob-type"),
810 Some(&"BlockBlob".to_string())
811 );
812 }
813 PresignedRequestBackend::Local { .. } => panic!("expected HTTP backend"),
814 }
815 }
816}
817
818#[derive(Debug)]
819struct GrpcMultipartUpload {
820 path: Path, client_stream_sender:
822 Option<mpsc::Sender<object_store::Result<StoragePutMultipartChunkRequest>>>,
823 response_join_handle: Option<JoinHandle<Result<tonic::Response<StoragePutResponse>, Status>>>,
824}
825
826#[async_trait]
827impl MultipartUpload for GrpcMultipartUpload {
828 fn put_part(&mut self, data: PutPayload) -> object_store::UploadPart {
829 let sender_clone = match self.client_stream_sender.as_ref() {
830 Some(s) => s.clone(),
831 None => {
832 return Box::pin(async {
833 Err(ObjectStoreError::Generic {
834 store: "GrpcMultipartUpload::put_part",
835 source: "Sender unavailable; put_part called after complete/abort or on failed init.".into(),
836 })
837 });
838 }
839 };
840
841 Box::pin(async move {
842 let bytes_data: Bytes = data.into(); let chunk_data_part = StoragePutMultipartChunkRequestPart::ChunkData(bytes_data.into());
844 let request = StoragePutMultipartChunkRequest {
845 part: Some(chunk_data_part),
846 };
847
848 sender_clone
849 .send(Ok(request))
850 .await
851 .map_err(|e| ObjectStoreError::Generic {
852 store: "GrpcMultipartUpload::put_part",
853 source: format!("Failed to send part, gRPC call might have failed: {}", e)
854 .into(),
855 })
856 })
857 }
858
859 async fn complete(&mut self) -> object_store::Result<PutResult> {
860 if let Some(sender) = self.client_stream_sender.take() {
861 drop(sender);
862 }
863
864 let handle = self
865 .response_join_handle
866 .take()
867 .ok_or_else(|| ObjectStoreError::Generic {
868 store: "GrpcMultipartUpload::complete",
869 source: "complete called more than once or on an already aborted/failed upload"
870 .into(),
871 })?;
872
873 match handle.await {
874 Ok(Ok(response)) => {
875 let put_response = response.into_inner();
876 Ok(PutResult {
877 e_tag: put_response.e_tag,
878 version: put_response.version,
879 })
880 }
881 Ok(Err(status)) => Err(storage_utils::map_status_to_os_error(
882 status,
883 Some(self.path.to_string()),
884 )),
885 Err(join_err) => Err(ObjectStoreError::from(join_err)),
886 }
887 }
888
889 async fn abort(&mut self) -> object_store::Result<()> {
890 if let Some(sender) = self.client_stream_sender.take() {
891 drop(sender);
892 }
893
894 if let Some(handle) = self.response_join_handle.take() {
895 handle.abort();
896 match handle.await {
897 Ok(Ok(_resp)) => {
898 Ok(())
901 }
902 Ok(Err(status)) => {
903 if status.code() == tonic::Code::Cancelled {
904 Ok(()) } else {
906 Err(storage_utils::map_status_to_os_error(
907 status,
908 Some(self.path.to_string()),
909 ))
910 }
911 }
912 Err(_join_err) => {
913 Ok(())
915 }
916 }
917 } else {
918 Ok(())
920 }
921 }
922}