Skip to main content

gestalt/
s3_provider.rs

1use std::collections::BTreeMap;
2use std::pin::Pin;
3use std::sync::Arc;
4use std::time::{Duration, SystemTime};
5
6use tokio_stream::{Stream, StreamExt};
7use tonic::codegen::async_trait;
8use tonic::{Request as GrpcRequest, Response as GrpcResponse, Status};
9
10use crate::api::RuntimeMetadata;
11use crate::error::Result as ProviderResult;
12use crate::generated::v1 as pb;
13use crate::protocol;
14use crate::rpc_status::rpc_status;
15
16/// Server-streamed object read chunks returned by [`S3Provider::read_object`].
17pub type S3ReadObjectStream =
18    Pin<Box<dyn Stream<Item = ProviderResult<S3ReadObjectFrame>> + Send + 'static>>;
19type S3RpcReadObjectStream =
20    Pin<Box<dyn Stream<Item = std::result::Result<pb::ReadObjectChunk, Status>> + Send + 'static>>;
21
22#[derive(Clone, Debug, PartialEq)]
23/// One frame in a provider-authored object read stream.
24pub enum S3ReadObjectFrame {
25    /// The `Meta` variant.
26    Meta(ObjectMeta),
27    /// The `Data` variant.
28    Data(Vec<u8>),
29}
30
31#[derive(Clone, Debug, PartialEq)]
32/// One frame in a provider-backed object write stream.
33pub enum S3WriteObjectFrame {
34    /// The `Open` variant.
35    Open(Box<S3WriteObjectOpen>),
36    /// The `Data` variant.
37    Data(Vec<u8>),
38    /// The `Empty` variant.
39    Empty,
40}
41
42#[derive(Clone, Debug, Default, Eq, PartialEq)]
43/// Metadata frame that starts an object write stream.
44pub struct S3WriteObjectOpen {
45    /// The `reference` field.
46    pub reference: Option<ObjectRef>,
47    /// The `content_type` field.
48    pub content_type: String,
49    /// The `cache_control` field.
50    pub cache_control: String,
51    /// The `content_disposition` field.
52    pub content_disposition: String,
53    /// The `content_encoding` field.
54    pub content_encoding: String,
55    /// The `content_language` field.
56    pub content_language: String,
57    /// The `metadata` field.
58    pub metadata: BTreeMap<String, String>,
59    /// The `if_match` field.
60    pub if_match: String,
61    /// The `if_none_match` field.
62    pub if_none_match: String,
63}
64
65/// Client-streamed object write chunks passed to [`S3Provider::write_object`].
66pub struct S3WriteObjectStream {
67    inner: tonic::Streaming<pb::WriteObjectRequest>,
68}
69
70impl S3WriteObjectStream {
71    pub(crate) fn new(inner: tonic::Streaming<pb::WriteObjectRequest>) -> Self {
72        Self { inner }
73    }
74
75    /// Reads the next write request frame from the upload stream.
76    pub async fn message(&mut self) -> ProviderResult<Option<S3WriteObjectFrame>> {
77        self.inner
78            .message()
79            .await
80            .map_err(|error| crate::Error::new(error.to_string()))?
81            .map(write_object_frame_from_proto)
82            .transpose()
83    }
84}
85
86#[derive(Clone, Debug, Default, Eq, PartialEq)]
87/// Identifies one object or object version.
88pub struct ObjectRef {
89    /// Object key.
90    pub key: String,
91    /// Optional object version id.
92    pub version_id: String,
93}
94
95#[derive(Clone, Debug, Default, PartialEq)]
96/// Describes an object returned by the provider.
97pub struct ObjectMeta {
98    /// Object reference, including version when returned by the provider.
99    pub reference: ObjectRef,
100    /// Entity tag returned by storage.
101    pub etag: String,
102    /// Object size in bytes.
103    pub size: i64,
104    /// MIME content type.
105    pub content_type: String,
106    /// Last-modified timestamp, when known.
107    pub last_modified: Option<SystemTime>,
108    /// User metadata associated with the object.
109    pub metadata: BTreeMap<String, String>,
110    /// Storage class reported by the provider.
111    pub storage_class: String,
112}
113
114#[derive(Clone, Debug, Default, Eq, PartialEq)]
115/// Requests a half-open slice of an object's bytes.
116pub struct ByteRange {
117    /// Inclusive starting byte offset.
118    pub start: Option<i64>,
119    /// Inclusive ending byte offset.
120    pub end: Option<i64>,
121}
122
123#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
124/// Identifies the HTTP verb encoded into a presigned URL.
125pub enum PresignMethod {
126    /// Let the provider choose the default method.
127    #[default]
128    Unspecified,
129    /// HTTP GET.
130    Get,
131    /// HTTP PUT.
132    Put,
133    /// HTTP DELETE.
134    Delete,
135    /// HTTP HEAD.
136    Head,
137}
138
139#[derive(Clone, Debug, Default, PartialEq, Eq)]
140/// Fetches metadata for one object.
141pub struct HeadObjectRequest {
142    /// The `reference` field.
143    pub reference: Option<ObjectRef>,
144}
145
146#[derive(Clone, Debug, Default, PartialEq)]
147/// Returns object metadata.
148pub struct HeadObjectResponse {
149    /// The `meta` field.
150    pub meta: Option<ObjectMeta>,
151}
152
153#[derive(Clone, Debug, Default, PartialEq)]
154/// Opens a streaming object read.
155pub struct ReadObjectRequest {
156    /// The `reference` field.
157    pub reference: Option<ObjectRef>,
158    /// The `range` field.
159    pub range: Option<ByteRange>,
160    /// The `if_match` field.
161    pub if_match: String,
162    /// The `if_none_match` field.
163    pub if_none_match: String,
164    /// The `if_modified_since` field.
165    pub if_modified_since: Option<SystemTime>,
166    /// The `if_unmodified_since` field.
167    pub if_unmodified_since: Option<SystemTime>,
168}
169
170#[derive(Clone, Debug, Default, PartialEq)]
171/// Returns metadata for a committed object write.
172pub struct WriteObjectResponse {
173    /// The `meta` field.
174    pub meta: Option<ObjectMeta>,
175}
176
177#[derive(Clone, Debug, Default, PartialEq, Eq)]
178/// Deletes one object.
179pub struct DeleteObjectRequest {
180    /// The `reference` field.
181    pub reference: Option<ObjectRef>,
182}
183
184#[derive(Clone, Debug, Default, PartialEq, Eq)]
185/// Lists objects in the provider's configured bucket.
186pub struct ListObjectsRequest {
187    /// The `prefix` field.
188    pub prefix: String,
189    /// The `delimiter` field.
190    pub delimiter: String,
191    /// The `continuation_token` field.
192    pub continuation_token: String,
193    /// The `start_after` field.
194    pub start_after: String,
195    /// The `max_keys` field.
196    pub max_keys: i32,
197}
198
199#[derive(Clone, Debug, Default, PartialEq)]
200/// One page of list-objects results.
201pub struct ListObjectsResponse {
202    /// The `objects` field.
203    pub objects: Vec<ObjectMeta>,
204    /// The `common_prefixes` field.
205    pub common_prefixes: Vec<String>,
206    /// The `next_continuation_token` field.
207    pub next_continuation_token: String,
208    /// The `has_more` field.
209    pub has_more: bool,
210}
211
212#[derive(Clone, Debug, Default, PartialEq, Eq)]
213/// Copies one object to another location.
214pub struct CopyObjectRequest {
215    /// The `source` field.
216    pub source: Option<ObjectRef>,
217    /// The `destination` field.
218    pub destination: Option<ObjectRef>,
219    /// The `if_match` field.
220    pub if_match: String,
221    /// The `if_none_match` field.
222    pub if_none_match: String,
223}
224
225#[derive(Clone, Debug, Default, PartialEq)]
226/// Returns metadata for a copied object.
227pub struct CopyObjectResponse {
228    /// The `meta` field.
229    pub meta: Option<ObjectMeta>,
230}
231
232#[derive(Clone, Debug, Default, PartialEq, Eq)]
233/// Asks the provider to mint a presigned URL.
234pub struct PresignObjectRequest {
235    /// The `reference` field.
236    pub reference: Option<ObjectRef>,
237    /// The `method` field.
238    pub method: PresignMethod,
239    /// The `expires` field.
240    pub expires: Duration,
241    /// The `content_type` field.
242    pub content_type: String,
243    /// The `content_disposition` field.
244    pub content_disposition: String,
245    /// The `headers` field.
246    pub headers: BTreeMap<String, String>,
247}
248
249#[derive(Clone, Debug, Default, PartialEq)]
250/// Returns a presigned URL plus required headers.
251pub struct PresignObjectResponse {
252    /// The `url` field.
253    pub url: String,
254    /// The `method` field.
255    pub method: PresignMethod,
256    /// The `expires_at` field.
257    pub expires_at: Option<SystemTime>,
258    /// The `headers` field.
259    pub headers: BTreeMap<String, String>,
260}
261
262#[async_trait]
263/// Lifecycle and RPC contract for S3-compatible providers.
264pub trait S3Provider: Send + Sync + 'static {
265    /// Configures the provider before it starts serving requests.
266    async fn configure(
267        &self,
268        _name: &str,
269        _config: serde_json::Map<String, serde_json::Value>,
270    ) -> ProviderResult<()> {
271        Ok(())
272    }
273
274    /// Returns runtime metadata that should augment the static manifest.
275    fn metadata(&self) -> Option<RuntimeMetadata> {
276        None
277    }
278
279    /// Returns non-fatal warnings the host should surface to users.
280    fn warnings(&self) -> Vec<String> {
281        Vec::new()
282    }
283
284    /// Performs an optional health check.
285    async fn health_check(&self) -> ProviderResult<()> {
286        Ok(())
287    }
288
289    /// Starts provider-owned background work after configuration.
290    async fn start(&self) -> ProviderResult<()> {
291        Ok(())
292    }
293
294    /// Shuts the provider down before the runtime exits.
295    async fn close(&self) -> ProviderResult<()> {
296        Ok(())
297    }
298
299    /// Returns object metadata without reading the object body.
300    async fn head_object(&self, _request: HeadObjectRequest) -> ProviderResult<HeadObjectResponse> {
301        Err(crate::Error::unimplemented(
302            "s3 head object is not implemented",
303        ))
304    }
305
306    /// Streams object metadata followed by object data chunks.
307    async fn read_object(&self, _request: ReadObjectRequest) -> ProviderResult<S3ReadObjectStream> {
308        Err(crate::Error::unimplemented(
309            "s3 read object is not implemented",
310        ))
311    }
312
313    /// Consumes a streamed upload and returns metadata for the written object.
314    async fn write_object(
315        &self,
316        _request: S3WriteObjectStream,
317    ) -> ProviderResult<WriteObjectResponse> {
318        Err(crate::Error::unimplemented(
319            "s3 write object is not implemented",
320        ))
321    }
322
323    /// Deletes one object or object version.
324    async fn delete_object(&self, _request: DeleteObjectRequest) -> ProviderResult<()> {
325        Err(crate::Error::unimplemented(
326            "s3 delete object is not implemented",
327        ))
328    }
329
330    /// Lists objects in the provider's configured bucket using S3-style pagination and delimiters.
331    async fn list_objects(
332        &self,
333        _request: ListObjectsRequest,
334    ) -> ProviderResult<ListObjectsResponse> {
335        Err(crate::Error::unimplemented(
336            "s3 list objects is not implemented",
337        ))
338    }
339
340    /// Copies one object to another object reference.
341    async fn copy_object(&self, _request: CopyObjectRequest) -> ProviderResult<CopyObjectResponse> {
342        Err(crate::Error::unimplemented(
343            "s3 copy object is not implemented",
344        ))
345    }
346
347    /// Creates a presigned URL for direct object access.
348    async fn presign_object(
349        &self,
350        _request: PresignObjectRequest,
351    ) -> ProviderResult<PresignObjectResponse> {
352        Err(crate::Error::unimplemented(
353            "s3 presign object is not implemented",
354        ))
355    }
356}
357
358#[derive(Clone)]
359pub(crate) struct S3RpcServer<P> {
360    provider: Arc<P>,
361}
362
363impl<P> S3RpcServer<P> {
364    pub(crate) fn new(provider: Arc<P>) -> Self {
365        Self { provider }
366    }
367}
368
369#[async_trait]
370impl<P> pb::s3_server::S3 for S3RpcServer<P>
371where
372    P: S3Provider,
373{
374    type ReadObjectStream = S3RpcReadObjectStream;
375
376    async fn head_object(
377        &self,
378        request: GrpcRequest<pb::HeadObjectRequest>,
379    ) -> std::result::Result<GrpcResponse<pb::HeadObjectResponse>, Status> {
380        let response = self
381            .provider
382            .head_object(head_object_request_from_proto(request.into_inner()))
383            .await
384            .map_err(|error| rpc_status("s3 head object", error))?;
385        Ok(GrpcResponse::new(
386            head_object_response_to_proto(response)
387                .map_err(|error| rpc_status("s3 head object", error))?,
388        ))
389    }
390
391    async fn read_object(
392        &self,
393        request: GrpcRequest<pb::ReadObjectRequest>,
394    ) -> std::result::Result<GrpcResponse<Self::ReadObjectStream>, Status> {
395        let stream = self
396            .provider
397            .read_object(
398                read_object_request_from_proto(request.into_inner())
399                    .map_err(|error| rpc_status("s3 read object", error))?,
400            )
401            .await
402            .map_err(|error| rpc_status("s3 read object", error))?
403            .map(|chunk| {
404                chunk
405                    .and_then(read_object_frame_to_proto)
406                    .map_err(|error| rpc_status("s3 read object stream", error))
407            });
408        Ok(GrpcResponse::new(Box::pin(stream)))
409    }
410
411    async fn write_object(
412        &self,
413        request: GrpcRequest<tonic::Streaming<pb::WriteObjectRequest>>,
414    ) -> std::result::Result<GrpcResponse<pb::WriteObjectResponse>, Status> {
415        let response = self
416            .provider
417            .write_object(S3WriteObjectStream::new(request.into_inner()))
418            .await
419            .map_err(|error| rpc_status("s3 write object", error))?;
420        Ok(GrpcResponse::new(
421            write_object_response_to_proto(response)
422                .map_err(|error| rpc_status("s3 write object", error))?,
423        ))
424    }
425
426    async fn delete_object(
427        &self,
428        request: GrpcRequest<pb::DeleteObjectRequest>,
429    ) -> std::result::Result<GrpcResponse<()>, Status> {
430        self.provider
431            .delete_object(delete_object_request_from_proto(request.into_inner()))
432            .await
433            .map_err(|error| rpc_status("s3 delete object", error))?;
434        Ok(GrpcResponse::new(()))
435    }
436
437    async fn list_objects(
438        &self,
439        request: GrpcRequest<pb::ListObjectsRequest>,
440    ) -> std::result::Result<GrpcResponse<pb::ListObjectsResponse>, Status> {
441        let response = self
442            .provider
443            .list_objects(list_objects_request_from_proto(request.into_inner()))
444            .await
445            .map_err(|error| rpc_status("s3 list objects", error))?;
446        Ok(GrpcResponse::new(
447            list_objects_response_to_proto(response)
448                .map_err(|error| rpc_status("s3 list objects", error))?,
449        ))
450    }
451
452    async fn copy_object(
453        &self,
454        request: GrpcRequest<pb::CopyObjectRequest>,
455    ) -> std::result::Result<GrpcResponse<pb::CopyObjectResponse>, Status> {
456        let response = self
457            .provider
458            .copy_object(copy_object_request_from_proto(request.into_inner()))
459            .await
460            .map_err(|error| rpc_status("s3 copy object", error))?;
461        Ok(GrpcResponse::new(
462            copy_object_response_to_proto(response)
463                .map_err(|error| rpc_status("s3 copy object", error))?,
464        ))
465    }
466
467    async fn presign_object(
468        &self,
469        request: GrpcRequest<pb::PresignObjectRequest>,
470    ) -> std::result::Result<GrpcResponse<pb::PresignObjectResponse>, Status> {
471        let response = self
472            .provider
473            .presign_object(presign_object_request_from_proto(request.into_inner()))
474            .await
475            .map_err(|error| rpc_status("s3 presign object", error))?;
476        Ok(GrpcResponse::new(presign_object_response_to_proto(
477            response,
478        )))
479    }
480}
481
482fn object_ref_to_proto(reference: ObjectRef) -> pb::S3ObjectRef {
483    pb::S3ObjectRef {
484        key: reference.key,
485        version_id: reference.version_id,
486    }
487}
488
489fn object_ref_from_proto(reference: pb::S3ObjectRef) -> ObjectRef {
490    ObjectRef {
491        key: reference.key,
492        version_id: reference.version_id,
493    }
494}
495
496fn object_meta_to_proto(meta: ObjectMeta) -> ProviderResult<pb::S3ObjectMeta> {
497    Ok(pb::S3ObjectMeta {
498        r#ref: Some(object_ref_to_proto(meta.reference)),
499        etag: meta.etag,
500        size: meta.size,
501        content_type: meta.content_type,
502        last_modified: meta.last_modified.map(protocol::timestamp_from_system_time),
503        metadata: meta.metadata,
504        storage_class: meta.storage_class,
505    })
506}
507
508fn head_object_request_from_proto(request: pb::HeadObjectRequest) -> HeadObjectRequest {
509    HeadObjectRequest {
510        reference: request.r#ref.map(object_ref_from_proto),
511    }
512}
513
514fn head_object_response_to_proto(
515    response: HeadObjectResponse,
516) -> ProviderResult<pb::HeadObjectResponse> {
517    Ok(pb::HeadObjectResponse {
518        meta: response.meta.map(object_meta_to_proto).transpose()?,
519    })
520}
521
522fn read_object_request_from_proto(
523    request: pb::ReadObjectRequest,
524) -> ProviderResult<ReadObjectRequest> {
525    Ok(ReadObjectRequest {
526        reference: request.r#ref.map(object_ref_from_proto),
527        range: request.range.map(|range| ByteRange {
528            start: range.start,
529            end: range.end,
530        }),
531        if_match: request.if_match,
532        if_none_match: request.if_none_match,
533        if_modified_since: request
534            .if_modified_since
535            .as_ref()
536            .map(protocol::system_time_from_timestamp)
537            .transpose()?,
538        if_unmodified_since: request
539            .if_unmodified_since
540            .as_ref()
541            .map(protocol::system_time_from_timestamp)
542            .transpose()?,
543    })
544}
545
546fn read_object_frame_to_proto(frame: S3ReadObjectFrame) -> ProviderResult<pb::ReadObjectChunk> {
547    let result = match frame {
548        S3ReadObjectFrame::Meta(meta) => {
549            pb::read_object_chunk::Result::Meta(object_meta_to_proto(meta)?)
550        }
551        S3ReadObjectFrame::Data(data) => pb::read_object_chunk::Result::Data(data),
552    };
553    Ok(pb::ReadObjectChunk {
554        result: Some(result),
555    })
556}
557
558fn write_object_frame_from_proto(
559    frame: pb::WriteObjectRequest,
560) -> ProviderResult<S3WriteObjectFrame> {
561    Ok(match frame.msg {
562        Some(pb::write_object_request::Msg::Open(open)) => {
563            S3WriteObjectFrame::Open(Box::new(S3WriteObjectOpen {
564                reference: open.r#ref.map(object_ref_from_proto),
565                content_type: open.content_type,
566                cache_control: open.cache_control,
567                content_disposition: open.content_disposition,
568                content_encoding: open.content_encoding,
569                content_language: open.content_language,
570                metadata: open.metadata,
571                if_match: open.if_match,
572                if_none_match: open.if_none_match,
573            }))
574        }
575        Some(pb::write_object_request::Msg::Data(data)) => S3WriteObjectFrame::Data(data),
576        None => S3WriteObjectFrame::Empty,
577    })
578}
579
580fn write_object_response_to_proto(
581    response: WriteObjectResponse,
582) -> ProviderResult<pb::WriteObjectResponse> {
583    Ok(pb::WriteObjectResponse {
584        meta: response.meta.map(object_meta_to_proto).transpose()?,
585    })
586}
587
588fn delete_object_request_from_proto(request: pb::DeleteObjectRequest) -> DeleteObjectRequest {
589    DeleteObjectRequest {
590        reference: request.r#ref.map(object_ref_from_proto),
591    }
592}
593
594fn list_objects_request_from_proto(request: pb::ListObjectsRequest) -> ListObjectsRequest {
595    ListObjectsRequest {
596        prefix: request.prefix,
597        delimiter: request.delimiter,
598        continuation_token: request.continuation_token,
599        start_after: request.start_after,
600        max_keys: request.max_keys,
601    }
602}
603
604fn list_objects_response_to_proto(
605    response: ListObjectsResponse,
606) -> ProviderResult<pb::ListObjectsResponse> {
607    Ok(pb::ListObjectsResponse {
608        objects: response
609            .objects
610            .into_iter()
611            .map(object_meta_to_proto)
612            .collect::<ProviderResult<Vec<_>>>()?,
613        common_prefixes: response.common_prefixes,
614        next_continuation_token: response.next_continuation_token,
615        has_more: response.has_more,
616    })
617}
618
619fn copy_object_request_from_proto(request: pb::CopyObjectRequest) -> CopyObjectRequest {
620    CopyObjectRequest {
621        source: request.source.map(object_ref_from_proto),
622        destination: request.destination.map(object_ref_from_proto),
623        if_match: request.if_match,
624        if_none_match: request.if_none_match,
625    }
626}
627
628fn copy_object_response_to_proto(
629    response: CopyObjectResponse,
630) -> ProviderResult<pb::CopyObjectResponse> {
631    Ok(pb::CopyObjectResponse {
632        meta: response.meta.map(object_meta_to_proto).transpose()?,
633    })
634}
635
636fn presign_object_request_from_proto(request: pb::PresignObjectRequest) -> PresignObjectRequest {
637    PresignObjectRequest {
638        reference: request.r#ref.map(object_ref_from_proto),
639        method: presign_method_from_proto(request.method),
640        expires: Duration::from_secs(u64::try_from(request.expires_seconds).unwrap_or_default()),
641        content_type: request.content_type,
642        content_disposition: request.content_disposition,
643        headers: request.headers,
644    }
645}
646
647fn presign_object_response_to_proto(response: PresignObjectResponse) -> pb::PresignObjectResponse {
648    pb::PresignObjectResponse {
649        url: response.url,
650        method: presign_method_to_proto(response.method) as i32,
651        expires_at: response
652            .expires_at
653            .map(protocol::timestamp_from_system_time),
654        headers: response.headers,
655    }
656}
657
658fn presign_method_to_proto(method: PresignMethod) -> pb::PresignMethod {
659    match method {
660        PresignMethod::Unspecified => pb::PresignMethod::Unspecified,
661        PresignMethod::Get => pb::PresignMethod::Get,
662        PresignMethod::Put => pb::PresignMethod::Put,
663        PresignMethod::Delete => pb::PresignMethod::Delete,
664        PresignMethod::Head => pb::PresignMethod::Head,
665    }
666}
667
668fn presign_method_from_proto(method: i32) -> PresignMethod {
669    match pb::PresignMethod::try_from(method).unwrap_or(pb::PresignMethod::Unspecified) {
670        pb::PresignMethod::Get => PresignMethod::Get,
671        pb::PresignMethod::Put => PresignMethod::Put,
672        pb::PresignMethod::Delete => PresignMethod::Delete,
673        pb::PresignMethod::Head => PresignMethod::Head,
674        pb::PresignMethod::Unspecified => PresignMethod::Unspecified,
675    }
676}
677
678#[cfg(test)]
679mod tests {
680    use tonic::Code;
681
682    use super::*;
683
684    struct EmptyS3Provider;
685
686    #[async_trait]
687    impl S3Provider for EmptyS3Provider {}
688
689    struct HiddenErrorS3Provider;
690
691    #[async_trait]
692    impl S3Provider for HiddenErrorS3Provider {
693        async fn head_object(
694            &self,
695            _request: HeadObjectRequest,
696        ) -> ProviderResult<HeadObjectResponse> {
697            Err(crate::Error::hidden_internal("backend detail"))
698        }
699    }
700
701    #[tokio::test]
702    async fn default_provider_methods_map_to_unimplemented_status() {
703        let server = S3RpcServer::new(Arc::new(EmptyS3Provider));
704        let status = <S3RpcServer<EmptyS3Provider> as pb::s3_server::S3>::head_object(
705            &server,
706            GrpcRequest::new(pb::HeadObjectRequest::default()),
707        )
708        .await
709        .expect_err("default head_object should be unimplemented");
710
711        assert_eq!(status.code(), Code::Unimplemented);
712        assert_eq!(status.message(), "s3 head object is not implemented");
713    }
714
715    #[tokio::test]
716    async fn provider_errors_are_sanitized_at_rpc_boundary() {
717        let server = S3RpcServer::new(Arc::new(HiddenErrorS3Provider));
718        let status = <S3RpcServer<HiddenErrorS3Provider> as pb::s3_server::S3>::head_object(
719            &server,
720            GrpcRequest::new(pb::HeadObjectRequest::default()),
721        )
722        .await
723        .expect_err("hidden provider error should fail");
724
725        assert_eq!(status.code(), Code::Unknown);
726        assert_eq!(status.message(), "s3 head object: internal error");
727    }
728}