Skip to main content

gestalt/
s3.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use hyper_util::rt::TokioIo;
6use serde::de::DeserializeOwned;
7use tokio_stream::iter;
8use tonic::codegen::async_trait;
9use tonic::transport::{Channel, Endpoint, Uri};
10use tower::service_fn;
11
12use crate::api::RuntimeMetadata;
13use crate::error::Result as ProviderResult;
14use crate::generated::v1::{self as pb, s3_client::S3Client as ProtoS3Client};
15
16type ClientResult<T> = std::result::Result<T, S3Error>;
17
18pub const ENV_S3_SOCKET: &str = "GESTALT_S3_SOCKET";
19const WRITE_CHUNK_SIZE: usize = 64 * 1024;
20
21#[derive(Debug, thiserror::Error)]
22pub enum S3Error {
23    #[error("not found")]
24    NotFound,
25    #[error("precondition failed")]
26    PreconditionFailed,
27    #[error("invalid range")]
28    InvalidRange,
29    #[error("{0}")]
30    Protocol(String),
31    #[error("{0}")]
32    Transport(#[from] tonic::transport::Error),
33    #[error("{0}")]
34    Status(#[from] tonic::Status),
35    #[error("{0}")]
36    Env(String),
37    #[error("{0}")]
38    Json(#[from] serde_json::Error),
39    #[error("{0}")]
40    Utf8(#[from] std::string::FromUtf8Error),
41}
42
43#[derive(Clone, Debug, Default, Eq, PartialEq)]
44pub struct ObjectRef {
45    pub bucket: String,
46    pub key: String,
47    pub version_id: String,
48}
49
50#[derive(Clone, Debug, Default, PartialEq)]
51pub struct ObjectMeta {
52    pub reference: ObjectRef,
53    pub etag: String,
54    pub size: i64,
55    pub content_type: String,
56    pub last_modified: Option<prost_types::Timestamp>,
57    pub metadata: BTreeMap<String, String>,
58    pub storage_class: String,
59}
60
61#[derive(Clone, Debug, Default, Eq, PartialEq)]
62pub struct ByteRange {
63    pub start: Option<i64>,
64    pub end: Option<i64>,
65}
66
67#[derive(Clone, Debug, Default, PartialEq)]
68pub struct ReadOptions {
69    pub range: Option<ByteRange>,
70    pub if_match: String,
71    pub if_none_match: String,
72    pub if_modified_since: Option<prost_types::Timestamp>,
73    pub if_unmodified_since: Option<prost_types::Timestamp>,
74}
75
76#[derive(Clone, Debug, Default, Eq, PartialEq)]
77pub struct WriteOptions {
78    pub content_type: String,
79    pub cache_control: String,
80    pub content_disposition: String,
81    pub content_encoding: String,
82    pub content_language: String,
83    pub metadata: BTreeMap<String, String>,
84    pub if_match: String,
85    pub if_none_match: String,
86}
87
88#[derive(Clone, Debug, Default, Eq, PartialEq)]
89pub struct ListOptions {
90    pub bucket: String,
91    pub prefix: String,
92    pub delimiter: String,
93    pub continuation_token: String,
94    pub start_after: String,
95    pub max_keys: i32,
96}
97
98#[derive(Clone, Debug, Default, PartialEq)]
99pub struct ListPage {
100    pub objects: Vec<ObjectMeta>,
101    pub common_prefixes: Vec<String>,
102    pub next_continuation_token: String,
103    pub has_more: bool,
104}
105
106#[derive(Clone, Debug, Default, Eq, PartialEq)]
107pub struct CopyOptions {
108    pub if_match: String,
109    pub if_none_match: String,
110}
111
112#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
113pub enum PresignMethod {
114    #[default]
115    Unspecified,
116    Get,
117    Put,
118    Delete,
119    Head,
120}
121
122#[derive(Clone, Debug, Default, Eq, PartialEq)]
123pub struct PresignOptions {
124    pub method: PresignMethod,
125    pub expires: Duration,
126    pub content_type: String,
127    pub content_disposition: String,
128    pub headers: BTreeMap<String, String>,
129}
130
131#[derive(Clone, Debug, Default, PartialEq)]
132pub struct PresignResult {
133    pub url: String,
134    pub method: PresignMethod,
135    pub expires_at: Option<prost_types::Timestamp>,
136    pub headers: BTreeMap<String, String>,
137}
138
139#[async_trait]
140pub trait S3Provider: pb::s3_server::S3 + Send + Sync + 'static {
141    async fn configure(
142        &self,
143        _name: &str,
144        _config: serde_json::Map<String, serde_json::Value>,
145    ) -> ProviderResult<()> {
146        Ok(())
147    }
148
149    fn metadata(&self) -> Option<RuntimeMetadata> {
150        None
151    }
152
153    fn warnings(&self) -> Vec<String> {
154        Vec::new()
155    }
156
157    async fn health_check(&self) -> ProviderResult<()> {
158        Ok(())
159    }
160
161    async fn close(&self) -> ProviderResult<()> {
162        Ok(())
163    }
164}
165
166#[async_trait]
167impl<T> pb::s3_server::S3 for Arc<T>
168where
169    T: S3Provider,
170{
171    type ReadObjectStream = <T as pb::s3_server::S3>::ReadObjectStream;
172
173    async fn head_object(
174        &self,
175        request: tonic::Request<pb::HeadObjectRequest>,
176    ) -> std::result::Result<tonic::Response<pb::HeadObjectResponse>, tonic::Status> {
177        <T as pb::s3_server::S3>::head_object(self.as_ref(), request).await
178    }
179
180    async fn read_object(
181        &self,
182        request: tonic::Request<pb::ReadObjectRequest>,
183    ) -> std::result::Result<tonic::Response<Self::ReadObjectStream>, tonic::Status> {
184        <T as pb::s3_server::S3>::read_object(self.as_ref(), request).await
185    }
186
187    async fn write_object(
188        &self,
189        request: tonic::Request<tonic::Streaming<pb::WriteObjectRequest>>,
190    ) -> std::result::Result<tonic::Response<pb::WriteObjectResponse>, tonic::Status> {
191        <T as pb::s3_server::S3>::write_object(self.as_ref(), request).await
192    }
193
194    async fn delete_object(
195        &self,
196        request: tonic::Request<pb::DeleteObjectRequest>,
197    ) -> std::result::Result<tonic::Response<()>, tonic::Status> {
198        <T as pb::s3_server::S3>::delete_object(self.as_ref(), request).await
199    }
200
201    async fn list_objects(
202        &self,
203        request: tonic::Request<pb::ListObjectsRequest>,
204    ) -> std::result::Result<tonic::Response<pb::ListObjectsResponse>, tonic::Status> {
205        <T as pb::s3_server::S3>::list_objects(self.as_ref(), request).await
206    }
207
208    async fn copy_object(
209        &self,
210        request: tonic::Request<pb::CopyObjectRequest>,
211    ) -> std::result::Result<tonic::Response<pb::CopyObjectResponse>, tonic::Status> {
212        <T as pb::s3_server::S3>::copy_object(self.as_ref(), request).await
213    }
214
215    async fn presign_object(
216        &self,
217        request: tonic::Request<pb::PresignObjectRequest>,
218    ) -> std::result::Result<tonic::Response<pb::PresignObjectResponse>, tonic::Status> {
219        <T as pb::s3_server::S3>::presign_object(self.as_ref(), request).await
220    }
221}
222
223pub struct S3 {
224    client: ProtoS3Client<Channel>,
225}
226
227impl S3 {
228    pub async fn connect() -> ClientResult<Self> {
229        Self::connect_named("").await
230    }
231
232    pub async fn connect_named(name: &str) -> ClientResult<Self> {
233        let env_name = s3_socket_env(name);
234        let socket_path =
235            std::env::var(&env_name).map_err(|_| S3Error::Env(format!("{env_name} is not set")))?;
236
237        let channel = Endpoint::try_from("http://[::]:50051")?
238            .connect_with_connector(service_fn(move |_: Uri| {
239                let path = socket_path.clone();
240                async move {
241                    tokio::net::UnixStream::connect(path)
242                        .await
243                        .map(TokioIo::new)
244                }
245            }))
246            .await?;
247
248        Ok(Self {
249            client: ProtoS3Client::new(channel),
250        })
251    }
252
253    pub fn object(&self, bucket: &str, key: &str) -> Object {
254        Object {
255            client: self.client.clone(),
256            reference: ObjectRef {
257                bucket: bucket.to_string(),
258                key: key.to_string(),
259                version_id: String::new(),
260            },
261        }
262    }
263
264    pub fn object_version(&self, bucket: &str, key: &str, version_id: &str) -> Object {
265        Object {
266            client: self.client.clone(),
267            reference: ObjectRef {
268                bucket: bucket.to_string(),
269                key: key.to_string(),
270                version_id: version_id.to_string(),
271            },
272        }
273    }
274
275    pub async fn head_object(&mut self, reference: ObjectRef) -> ClientResult<ObjectMeta> {
276        let response = self
277            .client
278            .head_object(pb::HeadObjectRequest {
279                r#ref: Some(object_ref_to_proto(reference)),
280            })
281            .await
282            .map_err(map_status)?;
283        required_object_meta(
284            response.into_inner().meta,
285            "head object response missing metadata",
286        )
287    }
288
289    pub async fn read_object(
290        &mut self,
291        reference: ObjectRef,
292        options: Option<ReadOptions>,
293    ) -> ClientResult<ObjectReader> {
294        let options = options.unwrap_or_default();
295        let mut stream = self
296            .client
297            .read_object(pb::ReadObjectRequest {
298                r#ref: Some(object_ref_to_proto(reference)),
299                range: options.range.map(byte_range_to_proto),
300                if_match: options.if_match,
301                if_none_match: options.if_none_match,
302                if_modified_since: options.if_modified_since,
303                if_unmodified_since: options.if_unmodified_since,
304            })
305            .await
306            .map_err(map_status)?
307            .into_inner();
308
309        let first =
310            stream.message().await.map_err(map_status)?.ok_or_else(|| {
311                S3Error::Protocol("read stream ended before metadata".to_string())
312            })?;
313
314        let meta = match first.result {
315            Some(pb::read_object_chunk::Result::Meta(meta)) => object_meta_from_proto(meta),
316            Some(pb::read_object_chunk::Result::Data(_)) => {
317                return Err(S3Error::Protocol(
318                    "read stream started with data instead of metadata".to_string(),
319                ));
320            }
321            None => {
322                return Err(S3Error::Protocol(
323                    "read stream started with an empty frame".to_string(),
324                ));
325            }
326        };
327
328        Ok(ObjectReader { meta, stream })
329    }
330
331    pub async fn write_object<B>(
332        &mut self,
333        reference: ObjectRef,
334        body: B,
335        options: Option<WriteOptions>,
336    ) -> ClientResult<ObjectMeta>
337    where
338        B: AsRef<[u8]>,
339    {
340        let options = options.unwrap_or_default();
341        let open = pb::WriteObjectRequest {
342            msg: Some(pb::write_object_request::Msg::Open(pb::WriteObjectOpen {
343                r#ref: Some(object_ref_to_proto(reference)),
344                content_type: options.content_type,
345                cache_control: options.cache_control,
346                content_disposition: options.content_disposition,
347                content_encoding: options.content_encoding,
348                content_language: options.content_language,
349                metadata: options.metadata,
350                if_match: options.if_match,
351                if_none_match: options.if_none_match,
352            })),
353        };
354
355        let body = body.as_ref();
356        let data = body
357            .chunks(WRITE_CHUNK_SIZE)
358            .filter(|chunk| !chunk.is_empty())
359            .map(|chunk| pb::WriteObjectRequest {
360                msg: Some(pb::write_object_request::Msg::Data(chunk.to_vec())),
361            })
362            .collect::<Vec<_>>();
363
364        let response = self
365            .client
366            .write_object(iter(std::iter::once(open).chain(data.into_iter())))
367            .await
368            .map_err(map_status)?;
369        required_object_meta(
370            response.into_inner().meta,
371            "write object response missing metadata",
372        )
373    }
374
375    pub async fn write_object_chunks<I, B>(
376        &mut self,
377        reference: ObjectRef,
378        chunks: I,
379        options: Option<WriteOptions>,
380    ) -> ClientResult<ObjectMeta>
381    where
382        I: IntoIterator<Item = B>,
383        I::IntoIter: Send + 'static,
384        B: AsRef<[u8]> + Send + 'static,
385    {
386        let options = options.unwrap_or_default();
387        let open = std::iter::once(pb::WriteObjectRequest {
388            msg: Some(pb::write_object_request::Msg::Open(pb::WriteObjectOpen {
389                r#ref: Some(object_ref_to_proto(reference)),
390                content_type: options.content_type,
391                cache_control: options.cache_control,
392                content_disposition: options.content_disposition,
393                content_encoding: options.content_encoding,
394                content_language: options.content_language,
395                metadata: options.metadata,
396                if_match: options.if_match,
397                if_none_match: options.if_none_match,
398            })),
399        });
400
401        let data = chunks.into_iter().filter_map(|chunk| {
402            let bytes = chunk.as_ref();
403            if bytes.is_empty() {
404                return None;
405            }
406            Some(pb::WriteObjectRequest {
407                msg: Some(pb::write_object_request::Msg::Data(bytes.to_vec())),
408            })
409        });
410
411        let response = self
412            .client
413            .write_object(iter(open.chain(data)))
414            .await
415            .map_err(map_status)?;
416        required_object_meta(
417            response.into_inner().meta,
418            "write object response missing metadata",
419        )
420    }
421
422    pub async fn delete_object(&mut self, reference: ObjectRef) -> ClientResult<()> {
423        self.client
424            .delete_object(pb::DeleteObjectRequest {
425                r#ref: Some(object_ref_to_proto(reference)),
426            })
427            .await
428            .map_err(map_status)?;
429        Ok(())
430    }
431
432    pub async fn list_objects(&mut self, options: ListOptions) -> ClientResult<ListPage> {
433        let response = self
434            .client
435            .list_objects(pb::ListObjectsRequest {
436                bucket: options.bucket,
437                prefix: options.prefix,
438                delimiter: options.delimiter,
439                continuation_token: options.continuation_token,
440                start_after: options.start_after,
441                max_keys: options.max_keys,
442            })
443            .await
444            .map_err(map_status)?;
445        Ok(list_page_from_proto(response.into_inner()))
446    }
447
448    pub async fn copy_object(
449        &mut self,
450        source: ObjectRef,
451        destination: ObjectRef,
452        options: Option<CopyOptions>,
453    ) -> ClientResult<ObjectMeta> {
454        let options = options.unwrap_or_default();
455        let response = self
456            .client
457            .copy_object(pb::CopyObjectRequest {
458                source: Some(object_ref_to_proto(source)),
459                destination: Some(object_ref_to_proto(destination)),
460                if_match: options.if_match,
461                if_none_match: options.if_none_match,
462            })
463            .await
464            .map_err(map_status)?;
465        required_object_meta(
466            response.into_inner().meta,
467            "copy object response missing metadata",
468        )
469    }
470
471    pub async fn presign_object(
472        &mut self,
473        reference: ObjectRef,
474        options: Option<PresignOptions>,
475    ) -> ClientResult<PresignResult> {
476        let options = options.unwrap_or_default();
477        let expires_seconds = i64::try_from(options.expires.as_secs()).unwrap_or(i64::MAX);
478        let response = self
479            .client
480            .presign_object(pb::PresignObjectRequest {
481                r#ref: Some(object_ref_to_proto(reference)),
482                method: presign_method_to_proto(options.method) as i32,
483                expires_seconds,
484                content_type: options.content_type,
485                content_disposition: options.content_disposition,
486                headers: options.headers,
487            })
488            .await
489            .map_err(map_status)?;
490        Ok(presign_result_from_proto(
491            response.into_inner(),
492            options.method,
493        ))
494    }
495}
496
497pub struct Object {
498    client: ProtoS3Client<Channel>,
499    reference: ObjectRef,
500}
501
502impl Object {
503    pub fn reference(&self) -> &ObjectRef {
504        &self.reference
505    }
506
507    pub async fn stat(&mut self) -> ClientResult<ObjectMeta> {
508        let mut client = S3 {
509            client: self.client.clone(),
510        };
511        client.head_object(self.reference.clone()).await
512    }
513
514    pub async fn exists(&mut self) -> ClientResult<bool> {
515        match self.stat().await {
516            Ok(_) => Ok(true),
517            Err(S3Error::NotFound) => Ok(false),
518            Err(error) => Err(error),
519        }
520    }
521
522    pub async fn stream(&mut self, options: Option<ReadOptions>) -> ClientResult<ObjectReader> {
523        let mut client = S3 {
524            client: self.client.clone(),
525        };
526        client.read_object(self.reference.clone(), options).await
527    }
528
529    pub async fn bytes(&mut self, options: Option<ReadOptions>) -> ClientResult<Vec<u8>> {
530        self.stream(options).await?.bytes().await
531    }
532
533    pub async fn text(&mut self, options: Option<ReadOptions>) -> ClientResult<String> {
534        self.stream(options).await?.text().await
535    }
536
537    pub async fn json<T>(&mut self, options: Option<ReadOptions>) -> ClientResult<T>
538    where
539        T: DeserializeOwned,
540    {
541        self.stream(options).await?.json().await
542    }
543
544    pub async fn write<B>(
545        &mut self,
546        body: B,
547        options: Option<WriteOptions>,
548    ) -> ClientResult<ObjectMeta>
549    where
550        B: AsRef<[u8]>,
551    {
552        let mut client = S3 {
553            client: self.client.clone(),
554        };
555        client
556            .write_object(self.reference.clone(), body, options)
557            .await
558    }
559
560    pub async fn write_chunks<I, B>(
561        &mut self,
562        chunks: I,
563        options: Option<WriteOptions>,
564    ) -> ClientResult<ObjectMeta>
565    where
566        I: IntoIterator<Item = B>,
567        I::IntoIter: Send + 'static,
568        B: AsRef<[u8]> + Send + 'static,
569    {
570        let mut client = S3 {
571            client: self.client.clone(),
572        };
573        client
574            .write_object_chunks(self.reference.clone(), chunks, options)
575            .await
576    }
577
578    pub async fn write_bytes(
579        &mut self,
580        body: impl AsRef<[u8]>,
581        options: Option<WriteOptions>,
582    ) -> ClientResult<ObjectMeta> {
583        self.write(body, options).await
584    }
585
586    pub async fn write_string(
587        &mut self,
588        body: impl AsRef<str>,
589        options: Option<WriteOptions>,
590    ) -> ClientResult<ObjectMeta> {
591        self.write(body.as_ref().as_bytes(), options).await
592    }
593
594    pub async fn write_json<T>(
595        &mut self,
596        value: &T,
597        options: Option<WriteOptions>,
598    ) -> ClientResult<ObjectMeta>
599    where
600        T: serde::Serialize + ?Sized,
601    {
602        let body = serde_json::to_vec(value)?;
603        let options = match options {
604            Some(mut options) => {
605                if options.content_type.is_empty() {
606                    options.content_type = "application/json".to_string();
607                }
608                Some(options)
609            }
610            None => Some(WriteOptions {
611                content_type: "application/json".to_string(),
612                ..WriteOptions::default()
613            }),
614        };
615        self.write(body, options).await
616    }
617
618    pub async fn delete(&mut self) -> ClientResult<()> {
619        let mut client = S3 {
620            client: self.client.clone(),
621        };
622        client.delete_object(self.reference.clone()).await
623    }
624
625    pub async fn presign(
626        &mut self,
627        options: Option<PresignOptions>,
628    ) -> ClientResult<PresignResult> {
629        let mut client = S3 {
630            client: self.client.clone(),
631        };
632        client.presign_object(self.reference.clone(), options).await
633    }
634}
635
636pub struct ObjectReader {
637    meta: ObjectMeta,
638    stream: tonic::Streaming<pb::ReadObjectChunk>,
639}
640
641impl ObjectReader {
642    pub fn meta(&self) -> &ObjectMeta {
643        &self.meta
644    }
645
646    pub async fn next_chunk(&mut self) -> ClientResult<Option<Vec<u8>>> {
647        loop {
648            let Some(message) = self.stream.message().await.map_err(map_status)? else {
649                return Ok(None);
650            };
651
652            match message.result {
653                Some(pb::read_object_chunk::Result::Data(data)) => {
654                    if data.is_empty() {
655                        continue;
656                    }
657                    return Ok(Some(data));
658                }
659                Some(pb::read_object_chunk::Result::Meta(_)) => {
660                    return Err(S3Error::Protocol(
661                        "read stream emitted metadata after the initial frame".to_string(),
662                    ));
663                }
664                None => continue,
665            }
666        }
667    }
668
669    pub async fn bytes(mut self) -> ClientResult<Vec<u8>> {
670        let mut body = Vec::new();
671        while let Some(chunk) = self.next_chunk().await? {
672            body.extend_from_slice(&chunk);
673        }
674        Ok(body)
675    }
676
677    pub async fn text(self) -> ClientResult<String> {
678        Ok(String::from_utf8(self.bytes().await?)?)
679    }
680
681    pub async fn json<T>(self) -> ClientResult<T>
682    where
683        T: DeserializeOwned,
684    {
685        Ok(serde_json::from_slice(&self.bytes().await?)?)
686    }
687}
688
689pub fn s3_socket_env(name: &str) -> String {
690    let trimmed = name.trim();
691    if trimmed.is_empty() {
692        return ENV_S3_SOCKET.to_string();
693    }
694    let mut env = String::from(ENV_S3_SOCKET);
695    env.push('_');
696    for ch in trimmed.chars() {
697        if ch.is_ascii_alphanumeric() {
698            env.push(ch.to_ascii_uppercase());
699        } else {
700            env.push('_');
701        }
702    }
703    env
704}
705
706fn map_status(err: tonic::Status) -> S3Error {
707    match err.code() {
708        tonic::Code::NotFound => S3Error::NotFound,
709        tonic::Code::FailedPrecondition => S3Error::PreconditionFailed,
710        tonic::Code::OutOfRange => S3Error::InvalidRange,
711        _ => S3Error::Status(err),
712    }
713}
714
715fn object_ref_to_proto(reference: ObjectRef) -> pb::S3ObjectRef {
716    pb::S3ObjectRef {
717        bucket: reference.bucket,
718        key: reference.key,
719        version_id: reference.version_id,
720    }
721}
722
723fn object_meta_from_proto(meta: pb::S3ObjectMeta) -> ObjectMeta {
724    ObjectMeta {
725        reference: meta
726            .r#ref
727            .map(|reference| ObjectRef {
728                bucket: reference.bucket,
729                key: reference.key,
730                version_id: reference.version_id,
731            })
732            .unwrap_or_default(),
733        etag: meta.etag,
734        size: meta.size,
735        content_type: meta.content_type,
736        last_modified: meta.last_modified,
737        metadata: meta.metadata,
738        storage_class: meta.storage_class,
739    }
740}
741
742fn required_object_meta(meta: Option<pb::S3ObjectMeta>, context: &str) -> ClientResult<ObjectMeta> {
743    let meta = meta.ok_or_else(|| S3Error::Protocol(context.to_string()))?;
744    Ok(object_meta_from_proto(meta))
745}
746
747fn byte_range_to_proto(range: ByteRange) -> pb::ByteRange {
748    pb::ByteRange {
749        start: range.start,
750        end: range.end,
751    }
752}
753
754fn list_page_from_proto(page: pb::ListObjectsResponse) -> ListPage {
755    ListPage {
756        objects: page
757            .objects
758            .into_iter()
759            .map(object_meta_from_proto)
760            .collect(),
761        common_prefixes: page.common_prefixes,
762        next_continuation_token: page.next_continuation_token,
763        has_more: page.has_more,
764    }
765}
766
767fn presign_method_to_proto(method: PresignMethod) -> pb::PresignMethod {
768    match method {
769        PresignMethod::Unspecified => pb::PresignMethod::Unspecified,
770        PresignMethod::Get => pb::PresignMethod::Get,
771        PresignMethod::Put => pb::PresignMethod::Put,
772        PresignMethod::Delete => pb::PresignMethod::Delete,
773        PresignMethod::Head => pb::PresignMethod::Head,
774    }
775}
776
777fn presign_method_from_proto(method: i32) -> PresignMethod {
778    match pb::PresignMethod::try_from(method).unwrap_or(pb::PresignMethod::Unspecified) {
779        pb::PresignMethod::Get => PresignMethod::Get,
780        pb::PresignMethod::Put => PresignMethod::Put,
781        pb::PresignMethod::Delete => PresignMethod::Delete,
782        pb::PresignMethod::Head => PresignMethod::Head,
783        pb::PresignMethod::Unspecified => PresignMethod::Unspecified,
784    }
785}
786
787fn presign_result_from_proto(
788    result: pb::PresignObjectResponse,
789    requested_method: PresignMethod,
790) -> PresignResult {
791    let method = presign_method_from_proto(result.method);
792    PresignResult {
793        url: result.url,
794        method: if method == PresignMethod::Unspecified {
795            requested_method
796        } else {
797            method
798        },
799        expires_at: result.expires_at,
800        headers: result.headers,
801    }
802}