1use std::collections::BTreeMap;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5
6use tokio_stream::{Stream, StreamExt};
7use tonic::codegen::async_trait;
8use tonic::{Request as GrpcRequest, Response as GrpcResponse, Status};
9
10use crate::api::RuntimeMetadata;
11use crate::error::Result as ProviderResult;
12use crate::generated::v1 as pb;
13use crate::protocol;
14use crate::rpc_status::rpc_status;
15
16pub type S3ReadObjectStream =
18 Pin<Box<dyn Stream<Item = ProviderResult<S3ReadObjectFrame>> + Send + 'static>>;
19type S3RpcReadObjectStream =
20 Pin<Box<dyn Stream<Item = std::result::Result<pb::ReadObjectChunk, Status>> + Send + 'static>>;
21
22#[derive(Clone, Debug, PartialEq)]
23pub enum S3ReadObjectFrame {
25 Meta(ObjectMeta),
27 Data(Vec<u8>),
29}
30
31#[derive(Clone, Debug, PartialEq)]
32pub enum S3WriteObjectFrame {
34 Open(Box<S3WriteObjectOpen>),
36 Data(Vec<u8>),
38 Empty,
40}
41
42#[derive(Clone, Debug, Default, Eq, PartialEq)]
43pub struct S3WriteObjectOpen {
45 pub reference: Option<ObjectRef>,
47 pub content_type: String,
49 pub cache_control: String,
51 pub content_disposition: String,
53 pub content_encoding: String,
55 pub content_language: String,
57 pub metadata: BTreeMap<String, String>,
59 pub if_match: String,
61 pub if_none_match: String,
63}
64
65pub struct S3WriteObjectStream {
67 inner: tonic::Streaming<pb::WriteObjectRequest>,
68}
69
70impl S3WriteObjectStream {
71 pub(crate) fn new(inner: tonic::Streaming<pb::WriteObjectRequest>) -> Self {
72 Self { inner }
73 }
74
75 pub async fn message(&mut self) -> ProviderResult<Option<S3WriteObjectFrame>> {
77 self.inner
78 .message()
79 .await
80 .map_err(|error| crate::Error::new(error.to_string()))?
81 .map(write_object_frame_from_proto)
82 .transpose()
83 }
84}
85
86#[derive(Clone, Debug, Default, Eq, PartialEq)]
87pub struct ObjectRef {
89 pub key: String,
91 pub version_id: String,
93}
94
95#[derive(Clone, Debug, Default, PartialEq)]
96pub struct ObjectMeta {
98 pub reference: ObjectRef,
100 pub etag: String,
102 pub size: i64,
104 pub content_type: String,
106 pub last_modified: Option<SystemTime>,
108 pub metadata: BTreeMap<String, String>,
110 pub storage_class: String,
112}
113
114#[derive(Clone, Debug, Default, Eq, PartialEq)]
115pub struct ByteRange {
117 pub start: Option<i64>,
119 pub end: Option<i64>,
121}
122
123#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
124pub enum PresignMethod {
126 #[default]
128 Unspecified,
129 Get,
131 Put,
133 Delete,
135 Head,
137}
138
139#[derive(Clone, Debug, Default, PartialEq, Eq)]
140pub struct HeadObjectRequest {
142 pub reference: Option<ObjectRef>,
144}
145
146#[derive(Clone, Debug, Default, PartialEq)]
147pub struct HeadObjectResponse {
149 pub meta: Option<ObjectMeta>,
151}
152
153#[derive(Clone, Debug, Default, PartialEq)]
154pub struct ReadObjectRequest {
156 pub reference: Option<ObjectRef>,
158 pub range: Option<ByteRange>,
160 pub if_match: String,
162 pub if_none_match: String,
164 pub if_modified_since: Option<SystemTime>,
166 pub if_unmodified_since: Option<SystemTime>,
168}
169
170#[derive(Clone, Debug, Default, PartialEq)]
171pub struct WriteObjectResponse {
173 pub meta: Option<ObjectMeta>,
175}
176
177#[derive(Clone, Debug, Default, PartialEq, Eq)]
178pub struct DeleteObjectRequest {
180 pub reference: Option<ObjectRef>,
182}
183
184#[derive(Clone, Debug, Default, PartialEq, Eq)]
185pub struct ListObjectsRequest {
187 pub prefix: String,
189 pub delimiter: String,
191 pub continuation_token: String,
193 pub start_after: String,
195 pub max_keys: i32,
197}
198
199#[derive(Clone, Debug, Default, PartialEq)]
200pub struct ListObjectsResponse {
202 pub objects: Vec<ObjectMeta>,
204 pub common_prefixes: Vec<String>,
206 pub next_continuation_token: String,
208 pub has_more: bool,
210}
211
212#[derive(Clone, Debug, Default, PartialEq, Eq)]
213pub struct CopyObjectRequest {
215 pub source: Option<ObjectRef>,
217 pub destination: Option<ObjectRef>,
219 pub if_match: String,
221 pub if_none_match: String,
223}
224
225#[derive(Clone, Debug, Default, PartialEq)]
226pub struct CopyObjectResponse {
228 pub meta: Option<ObjectMeta>,
230}
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
233pub struct PresignObjectRequest {
235 pub reference: Option<ObjectRef>,
237 pub method: PresignMethod,
239 pub expires: Duration,
241 pub content_type: String,
243 pub content_disposition: String,
245 pub headers: BTreeMap<String, String>,
247}
248
249#[derive(Clone, Debug, Default, PartialEq)]
250pub struct PresignObjectResponse {
252 pub url: String,
254 pub method: PresignMethod,
256 pub expires_at: Option<SystemTime>,
258 pub headers: BTreeMap<String, String>,
260}
261
262#[async_trait]
263pub trait S3Provider: Send + Sync + 'static {
265 async fn configure(
267 &self,
268 _name: &str,
269 _config: serde_json::Map<String, serde_json::Value>,
270 ) -> ProviderResult<()> {
271 Ok(())
272 }
273
274 fn metadata(&self) -> Option<RuntimeMetadata> {
276 None
277 }
278
279 fn warnings(&self) -> Vec<String> {
281 Vec::new()
282 }
283
284 async fn health_check(&self) -> ProviderResult<()> {
286 Ok(())
287 }
288
289 async fn start(&self) -> ProviderResult<()> {
291 Ok(())
292 }
293
294 async fn close(&self) -> ProviderResult<()> {
296 Ok(())
297 }
298
299 async fn head_object(&self, _request: HeadObjectRequest) -> ProviderResult<HeadObjectResponse> {
301 Err(crate::Error::unimplemented(
302 "s3 head object is not implemented",
303 ))
304 }
305
306 async fn read_object(&self, _request: ReadObjectRequest) -> ProviderResult<S3ReadObjectStream> {
308 Err(crate::Error::unimplemented(
309 "s3 read object is not implemented",
310 ))
311 }
312
313 async fn write_object(
315 &self,
316 _request: S3WriteObjectStream,
317 ) -> ProviderResult<WriteObjectResponse> {
318 Err(crate::Error::unimplemented(
319 "s3 write object is not implemented",
320 ))
321 }
322
323 async fn delete_object(&self, _request: DeleteObjectRequest) -> ProviderResult<()> {
325 Err(crate::Error::unimplemented(
326 "s3 delete object is not implemented",
327 ))
328 }
329
330 async fn list_objects(
332 &self,
333 _request: ListObjectsRequest,
334 ) -> ProviderResult<ListObjectsResponse> {
335 Err(crate::Error::unimplemented(
336 "s3 list objects is not implemented",
337 ))
338 }
339
340 async fn copy_object(&self, _request: CopyObjectRequest) -> ProviderResult<CopyObjectResponse> {
342 Err(crate::Error::unimplemented(
343 "s3 copy object is not implemented",
344 ))
345 }
346
347 async fn presign_object(
349 &self,
350 _request: PresignObjectRequest,
351 ) -> ProviderResult<PresignObjectResponse> {
352 Err(crate::Error::unimplemented(
353 "s3 presign object is not implemented",
354 ))
355 }
356}
357
358#[derive(Clone)]
359pub(crate) struct S3RpcServer<P> {
360 provider: Arc<P>,
361}
362
363impl<P> S3RpcServer<P> {
364 pub(crate) fn new(provider: Arc<P>) -> Self {
365 Self { provider }
366 }
367}
368
369#[async_trait]
370impl<P> pb::s3_server::S3 for S3RpcServer<P>
371where
372 P: S3Provider,
373{
374 type ReadObjectStream = S3RpcReadObjectStream;
375
376 async fn head_object(
377 &self,
378 request: GrpcRequest<pb::HeadObjectRequest>,
379 ) -> std::result::Result<GrpcResponse<pb::HeadObjectResponse>, Status> {
380 let response = self
381 .provider
382 .head_object(head_object_request_from_proto(request.into_inner()))
383 .await
384 .map_err(|error| rpc_status("s3 head object", error))?;
385 Ok(GrpcResponse::new(
386 head_object_response_to_proto(response)
387 .map_err(|error| rpc_status("s3 head object", error))?,
388 ))
389 }
390
391 async fn read_object(
392 &self,
393 request: GrpcRequest<pb::ReadObjectRequest>,
394 ) -> std::result::Result<GrpcResponse<Self::ReadObjectStream>, Status> {
395 let stream = self
396 .provider
397 .read_object(
398 read_object_request_from_proto(request.into_inner())
399 .map_err(|error| rpc_status("s3 read object", error))?,
400 )
401 .await
402 .map_err(|error| rpc_status("s3 read object", error))?
403 .map(|chunk| {
404 chunk
405 .and_then(read_object_frame_to_proto)
406 .map_err(|error| rpc_status("s3 read object stream", error))
407 });
408 Ok(GrpcResponse::new(Box::pin(stream)))
409 }
410
411 async fn write_object(
412 &self,
413 request: GrpcRequest<tonic::Streaming<pb::WriteObjectRequest>>,
414 ) -> std::result::Result<GrpcResponse<pb::WriteObjectResponse>, Status> {
415 let response = self
416 .provider
417 .write_object(S3WriteObjectStream::new(request.into_inner()))
418 .await
419 .map_err(|error| rpc_status("s3 write object", error))?;
420 Ok(GrpcResponse::new(
421 write_object_response_to_proto(response)
422 .map_err(|error| rpc_status("s3 write object", error))?,
423 ))
424 }
425
426 async fn delete_object(
427 &self,
428 request: GrpcRequest<pb::DeleteObjectRequest>,
429 ) -> std::result::Result<GrpcResponse<()>, Status> {
430 self.provider
431 .delete_object(delete_object_request_from_proto(request.into_inner()))
432 .await
433 .map_err(|error| rpc_status("s3 delete object", error))?;
434 Ok(GrpcResponse::new(()))
435 }
436
437 async fn list_objects(
438 &self,
439 request: GrpcRequest<pb::ListObjectsRequest>,
440 ) -> std::result::Result<GrpcResponse<pb::ListObjectsResponse>, Status> {
441 let response = self
442 .provider
443 .list_objects(list_objects_request_from_proto(request.into_inner()))
444 .await
445 .map_err(|error| rpc_status("s3 list objects", error))?;
446 Ok(GrpcResponse::new(
447 list_objects_response_to_proto(response)
448 .map_err(|error| rpc_status("s3 list objects", error))?,
449 ))
450 }
451
452 async fn copy_object(
453 &self,
454 request: GrpcRequest<pb::CopyObjectRequest>,
455 ) -> std::result::Result<GrpcResponse<pb::CopyObjectResponse>, Status> {
456 let response = self
457 .provider
458 .copy_object(copy_object_request_from_proto(request.into_inner()))
459 .await
460 .map_err(|error| rpc_status("s3 copy object", error))?;
461 Ok(GrpcResponse::new(
462 copy_object_response_to_proto(response)
463 .map_err(|error| rpc_status("s3 copy object", error))?,
464 ))
465 }
466
467 async fn presign_object(
468 &self,
469 request: GrpcRequest<pb::PresignObjectRequest>,
470 ) -> std::result::Result<GrpcResponse<pb::PresignObjectResponse>, Status> {
471 let response = self
472 .provider
473 .presign_object(presign_object_request_from_proto(request.into_inner()))
474 .await
475 .map_err(|error| rpc_status("s3 presign object", error))?;
476 Ok(GrpcResponse::new(presign_object_response_to_proto(
477 response,
478 )))
479 }
480}
481
482fn object_ref_to_proto(reference: ObjectRef) -> pb::S3ObjectRef {
483 pb::S3ObjectRef {
484 key: reference.key,
485 version_id: reference.version_id,
486 }
487}
488
489fn object_ref_from_proto(reference: pb::S3ObjectRef) -> ObjectRef {
490 ObjectRef {
491 key: reference.key,
492 version_id: reference.version_id,
493 }
494}
495
496fn object_meta_to_proto(meta: ObjectMeta) -> ProviderResult<pb::S3ObjectMeta> {
497 Ok(pb::S3ObjectMeta {
498 r#ref: Some(object_ref_to_proto(meta.reference)),
499 etag: meta.etag,
500 size: meta.size,
501 content_type: meta.content_type,
502 last_modified: meta.last_modified.map(protocol::timestamp_from_system_time),
503 metadata: meta.metadata,
504 storage_class: meta.storage_class,
505 })
506}
507
508fn head_object_request_from_proto(request: pb::HeadObjectRequest) -> HeadObjectRequest {
509 HeadObjectRequest {
510 reference: request.r#ref.map(object_ref_from_proto),
511 }
512}
513
514fn head_object_response_to_proto(
515 response: HeadObjectResponse,
516) -> ProviderResult<pb::HeadObjectResponse> {
517 Ok(pb::HeadObjectResponse {
518 meta: response.meta.map(object_meta_to_proto).transpose()?,
519 })
520}
521
522fn read_object_request_from_proto(
523 request: pb::ReadObjectRequest,
524) -> ProviderResult<ReadObjectRequest> {
525 Ok(ReadObjectRequest {
526 reference: request.r#ref.map(object_ref_from_proto),
527 range: request.range.map(|range| ByteRange {
528 start: range.start,
529 end: range.end,
530 }),
531 if_match: request.if_match,
532 if_none_match: request.if_none_match,
533 if_modified_since: request
534 .if_modified_since
535 .as_ref()
536 .map(protocol::system_time_from_timestamp)
537 .transpose()?,
538 if_unmodified_since: request
539 .if_unmodified_since
540 .as_ref()
541 .map(protocol::system_time_from_timestamp)
542 .transpose()?,
543 })
544}
545
546fn read_object_frame_to_proto(frame: S3ReadObjectFrame) -> ProviderResult<pb::ReadObjectChunk> {
547 let result = match frame {
548 S3ReadObjectFrame::Meta(meta) => {
549 pb::read_object_chunk::Result::Meta(object_meta_to_proto(meta)?)
550 }
551 S3ReadObjectFrame::Data(data) => pb::read_object_chunk::Result::Data(data),
552 };
553 Ok(pb::ReadObjectChunk {
554 result: Some(result),
555 })
556}
557
558fn write_object_frame_from_proto(
559 frame: pb::WriteObjectRequest,
560) -> ProviderResult<S3WriteObjectFrame> {
561 Ok(match frame.msg {
562 Some(pb::write_object_request::Msg::Open(open)) => {
563 S3WriteObjectFrame::Open(Box::new(S3WriteObjectOpen {
564 reference: open.r#ref.map(object_ref_from_proto),
565 content_type: open.content_type,
566 cache_control: open.cache_control,
567 content_disposition: open.content_disposition,
568 content_encoding: open.content_encoding,
569 content_language: open.content_language,
570 metadata: open.metadata,
571 if_match: open.if_match,
572 if_none_match: open.if_none_match,
573 }))
574 }
575 Some(pb::write_object_request::Msg::Data(data)) => S3WriteObjectFrame::Data(data),
576 None => S3WriteObjectFrame::Empty,
577 })
578}
579
580fn write_object_response_to_proto(
581 response: WriteObjectResponse,
582) -> ProviderResult<pb::WriteObjectResponse> {
583 Ok(pb::WriteObjectResponse {
584 meta: response.meta.map(object_meta_to_proto).transpose()?,
585 })
586}
587
588fn delete_object_request_from_proto(request: pb::DeleteObjectRequest) -> DeleteObjectRequest {
589 DeleteObjectRequest {
590 reference: request.r#ref.map(object_ref_from_proto),
591 }
592}
593
594fn list_objects_request_from_proto(request: pb::ListObjectsRequest) -> ListObjectsRequest {
595 ListObjectsRequest {
596 prefix: request.prefix,
597 delimiter: request.delimiter,
598 continuation_token: request.continuation_token,
599 start_after: request.start_after,
600 max_keys: request.max_keys,
601 }
602}
603
604fn list_objects_response_to_proto(
605 response: ListObjectsResponse,
606) -> ProviderResult<pb::ListObjectsResponse> {
607 Ok(pb::ListObjectsResponse {
608 objects: response
609 .objects
610 .into_iter()
611 .map(object_meta_to_proto)
612 .collect::<ProviderResult<Vec<_>>>()?,
613 common_prefixes: response.common_prefixes,
614 next_continuation_token: response.next_continuation_token,
615 has_more: response.has_more,
616 })
617}
618
619fn copy_object_request_from_proto(request: pb::CopyObjectRequest) -> CopyObjectRequest {
620 CopyObjectRequest {
621 source: request.source.map(object_ref_from_proto),
622 destination: request.destination.map(object_ref_from_proto),
623 if_match: request.if_match,
624 if_none_match: request.if_none_match,
625 }
626}
627
628fn copy_object_response_to_proto(
629 response: CopyObjectResponse,
630) -> ProviderResult<pb::CopyObjectResponse> {
631 Ok(pb::CopyObjectResponse {
632 meta: response.meta.map(object_meta_to_proto).transpose()?,
633 })
634}
635
636fn presign_object_request_from_proto(request: pb::PresignObjectRequest) -> PresignObjectRequest {
637 PresignObjectRequest {
638 reference: request.r#ref.map(object_ref_from_proto),
639 method: presign_method_from_proto(request.method),
640 expires: Duration::from_secs(u64::try_from(request.expires_seconds).unwrap_or_default()),
641 content_type: request.content_type,
642 content_disposition: request.content_disposition,
643 headers: request.headers,
644 }
645}
646
647fn presign_object_response_to_proto(response: PresignObjectResponse) -> pb::PresignObjectResponse {
648 pb::PresignObjectResponse {
649 url: response.url,
650 method: presign_method_to_proto(response.method) as i32,
651 expires_at: response
652 .expires_at
653 .map(protocol::timestamp_from_system_time),
654 headers: response.headers,
655 }
656}
657
658fn presign_method_to_proto(method: PresignMethod) -> pb::PresignMethod {
659 match method {
660 PresignMethod::Unspecified => pb::PresignMethod::Unspecified,
661 PresignMethod::Get => pb::PresignMethod::Get,
662 PresignMethod::Put => pb::PresignMethod::Put,
663 PresignMethod::Delete => pb::PresignMethod::Delete,
664 PresignMethod::Head => pb::PresignMethod::Head,
665 }
666}
667
668fn presign_method_from_proto(method: i32) -> PresignMethod {
669 match pb::PresignMethod::try_from(method).unwrap_or(pb::PresignMethod::Unspecified) {
670 pb::PresignMethod::Get => PresignMethod::Get,
671 pb::PresignMethod::Put => PresignMethod::Put,
672 pb::PresignMethod::Delete => PresignMethod::Delete,
673 pb::PresignMethod::Head => PresignMethod::Head,
674 pb::PresignMethod::Unspecified => PresignMethod::Unspecified,
675 }
676}
677
678#[cfg(test)]
679mod tests {
680 use tonic::Code;
681
682 use super::*;
683
684 struct EmptyS3Provider;
685
686 #[async_trait]
687 impl S3Provider for EmptyS3Provider {}
688
689 struct HiddenErrorS3Provider;
690
691 #[async_trait]
692 impl S3Provider for HiddenErrorS3Provider {
693 async fn head_object(
694 &self,
695 _request: HeadObjectRequest,
696 ) -> ProviderResult<HeadObjectResponse> {
697 Err(crate::Error::hidden_internal("backend detail"))
698 }
699 }
700
701 #[tokio::test]
702 async fn default_provider_methods_map_to_unimplemented_status() {
703 let server = S3RpcServer::new(Arc::new(EmptyS3Provider));
704 let status = <S3RpcServer<EmptyS3Provider> as pb::s3_server::S3>::head_object(
705 &server,
706 GrpcRequest::new(pb::HeadObjectRequest::default()),
707 )
708 .await
709 .expect_err("default head_object should be unimplemented");
710
711 assert_eq!(status.code(), Code::Unimplemented);
712 assert_eq!(status.message(), "s3 head object is not implemented");
713 }
714
715 #[tokio::test]
716 async fn provider_errors_are_sanitized_at_rpc_boundary() {
717 let server = S3RpcServer::new(Arc::new(HiddenErrorS3Provider));
718 let status = <S3RpcServer<HiddenErrorS3Provider> as pb::s3_server::S3>::head_object(
719 &server,
720 GrpcRequest::new(pb::HeadObjectRequest::default()),
721 )
722 .await
723 .expect_err("hidden provider error should fail");
724
725 assert_eq!(status.code(), Code::Unknown);
726 assert_eq!(status.message(), "s3 head object: internal error");
727 }
728}