Skip to main content

gestalt/
s3.rs

1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use hyper_util::rt::TokioIo;
6use serde::de::DeserializeOwned;
7use tokio_stream::iter;
8use tonic::codegen::async_trait;
9use tonic::metadata::MetadataValue;
10use tonic::service::Interceptor;
11use tonic::service::interceptor::InterceptedService;
12use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
13use tower::service_fn;
14
15use crate::api::RuntimeMetadata;
16use crate::error::Result as ProviderResult;
17use crate::generated::v1::{
18    self as pb, s3_client::S3Client as ProtoS3Client,
19    s3_object_access_client::S3ObjectAccessClient as ProtoS3ObjectAccessClient,
20};
21
22type ClientResult<T> = std::result::Result<T, S3Error>;
23type S3Transport = InterceptedService<Channel, RelayTokenInterceptor>;
24
25/// Default Unix-socket environment variable used by [`S3::connect`].
26pub const ENV_S3_SOCKET: &str = "GESTALT_S3_SOCKET";
27/// Suffix added to named S3 socket variables for relay-token variables.
28pub const ENV_S3_SOCKET_TOKEN_SUFFIX: &str = "_TOKEN";
29/// Default relay-token environment variable used by [`S3::connect`].
30pub const ENV_S3_SOCKET_TOKEN: &str = "GESTALT_S3_SOCKET_TOKEN";
31const S3_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
32const WRITE_CHUNK_SIZE: usize = 64 * 1024;
33
34#[derive(Debug, thiserror::Error)]
35/// Errors returned by the S3 transport client.
36pub enum S3Error {
37    /// The referenced object does not exist.
38    #[error("not found")]
39    NotFound,
40    /// Conditional request headers failed.
41    #[error("precondition failed")]
42    PreconditionFailed,
43    /// The requested byte range is invalid.
44    #[error("invalid range")]
45    InvalidRange,
46    /// The host returned a protocol value the SDK could not represent.
47    #[error("{0}")]
48    Protocol(String),
49    /// The host-service transport could not be created.
50    #[error("{0}")]
51    Transport(#[from] tonic::transport::Error),
52    /// The host-service RPC returned a gRPC status.
53    #[error("{0}")]
54    Status(#[from] tonic::Status),
55    /// Required environment or target configuration was invalid.
56    #[error("{0}")]
57    Env(String),
58    /// JSON encoding or decoding failed.
59    #[error("{0}")]
60    Json(#[from] serde_json::Error),
61    /// UTF-8 decoding failed.
62    #[error("{0}")]
63    Utf8(#[from] std::string::FromUtf8Error),
64}
65
66#[derive(Clone, Debug, Default, Eq, PartialEq)]
67/// Identifies one object or object version.
68pub struct ObjectRef {
69    /// Bucket name.
70    pub bucket: String,
71    /// Object key.
72    pub key: String,
73    /// Optional object version id.
74    pub version_id: String,
75}
76
77#[derive(Clone, Debug, Default, PartialEq)]
78/// Describes an object returned by the provider.
79pub struct ObjectMeta {
80    /// Object reference, including version when returned by the provider.
81    pub reference: ObjectRef,
82    /// Entity tag returned by storage.
83    pub etag: String,
84    /// Object size in bytes.
85    pub size: i64,
86    /// MIME content type.
87    pub content_type: String,
88    /// Last-modified timestamp, when known.
89    pub last_modified: Option<prost_types::Timestamp>,
90    /// User metadata associated with the object.
91    pub metadata: BTreeMap<String, String>,
92    /// Storage class reported by the provider.
93    pub storage_class: String,
94}
95
96#[derive(Clone, Debug, Default, Eq, PartialEq)]
97/// Requests a half-open slice of an object's bytes.
98pub struct ByteRange {
99    /// Inclusive starting byte offset.
100    pub start: Option<i64>,
101    /// Inclusive ending byte offset.
102    pub end: Option<i64>,
103}
104
105#[derive(Clone, Debug, Default, PartialEq)]
106/// Configures conditional and ranged reads.
107pub struct ReadOptions {
108    /// Optional byte range to read.
109    pub range: Option<ByteRange>,
110    /// Read only if the current ETag matches this value.
111    pub if_match: String,
112    /// Read only if the current ETag does not match this value.
113    pub if_none_match: String,
114    /// Read only if the object has changed since this timestamp.
115    pub if_modified_since: Option<prost_types::Timestamp>,
116    /// Read only if the object has not changed since this timestamp.
117    pub if_unmodified_since: Option<prost_types::Timestamp>,
118}
119
120#[derive(Clone, Debug, Default, Eq, PartialEq)]
121/// Configures object writes.
122pub struct WriteOptions {
123    /// MIME content type.
124    pub content_type: String,
125    /// Cache-Control header value.
126    pub cache_control: String,
127    /// Content-Disposition header value.
128    pub content_disposition: String,
129    /// Content-Encoding header value.
130    pub content_encoding: String,
131    /// Content-Language header value.
132    pub content_language: String,
133    /// User metadata to store with the object.
134    pub metadata: BTreeMap<String, String>,
135    /// Write only if the current ETag matches this value.
136    pub if_match: String,
137    /// Write only if the object does not exist or has a different ETag.
138    pub if_none_match: String,
139}
140
141#[derive(Clone, Debug, Default, Eq, PartialEq)]
142/// Configures list-objects requests.
143pub struct ListOptions {
144    /// Bucket to list.
145    pub bucket: String,
146    /// Prefix filter.
147    pub prefix: String,
148    /// Delimiter for grouping common prefixes.
149    pub delimiter: String,
150    /// Continuation token from the previous page.
151    pub continuation_token: String,
152    /// Start listing after this key.
153    pub start_after: String,
154    /// Maximum number of keys to return.
155    pub max_keys: i32,
156}
157
158#[derive(Clone, Debug, Default, PartialEq)]
159/// Represents one page of list-objects results.
160pub struct ListPage {
161    /// Object metadata rows in this page.
162    pub objects: Vec<ObjectMeta>,
163    /// Common prefixes returned by delimiter grouping.
164    pub common_prefixes: Vec<String>,
165    /// Continuation token for the next page.
166    pub next_continuation_token: String,
167    /// Whether another page is available.
168    pub has_more: bool,
169}
170
171#[derive(Clone, Debug, Default, Eq, PartialEq)]
172/// Configures conditional copy requests.
173pub struct CopyOptions {
174    /// Copy only if the source ETag matches this value.
175    pub if_match: String,
176    /// Copy only if the source ETag does not match this value.
177    pub if_none_match: String,
178}
179
180#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
181/// Identifies the HTTP verb encoded into a presigned URL.
182pub enum PresignMethod {
183    /// Let the provider choose the default method.
184    #[default]
185    Unspecified,
186    /// HTTP GET.
187    Get,
188    /// HTTP PUT.
189    Put,
190    /// HTTP DELETE.
191    Delete,
192    /// HTTP HEAD.
193    Head,
194}
195
196#[derive(Clone, Debug, Default, Eq, PartialEq)]
197/// Configures presigned URL generation.
198pub struct PresignOptions {
199    /// HTTP method encoded into the URL.
200    pub method: PresignMethod,
201    /// Requested URL lifetime.
202    pub expires: Duration,
203    /// Required content type for upload-style URLs.
204    pub content_type: String,
205    /// Required content disposition for upload-style URLs.
206    pub content_disposition: String,
207    /// Additional signed headers.
208    pub headers: BTreeMap<String, String>,
209}
210
211#[derive(Clone, Debug, Default, PartialEq)]
212/// Contains a presigned URL plus any required headers.
213pub struct PresignResult {
214    /// URL clients should use.
215    pub url: String,
216    /// HTTP method clients should use.
217    pub method: PresignMethod,
218    /// Expiration timestamp, when known.
219    pub expires_at: Option<prost_types::Timestamp>,
220    /// Headers clients must send with the URL.
221    pub headers: BTreeMap<String, String>,
222}
223
224/// Options for creating a host-mediated object access URL.
225pub type ObjectAccessURLOptions = PresignOptions;
226/// Result returned when creating a host-mediated object access URL.
227pub type ObjectAccessURL = PresignResult;
228
229#[async_trait]
230/// Lifecycle and RPC contract for S3-compatible providers.
231pub trait S3Provider: pb::s3_server::S3 + Send + Sync + 'static {
232    /// Configures the provider before it starts serving requests.
233    async fn configure(
234        &self,
235        _name: &str,
236        _config: serde_json::Map<String, serde_json::Value>,
237    ) -> ProviderResult<()> {
238        Ok(())
239    }
240
241    /// Returns runtime metadata that should augment the static manifest.
242    fn metadata(&self) -> Option<RuntimeMetadata> {
243        None
244    }
245
246    /// Returns non-fatal warnings the host should surface to users.
247    fn warnings(&self) -> Vec<String> {
248        Vec::new()
249    }
250
251    /// Performs an optional health check.
252    async fn health_check(&self) -> ProviderResult<()> {
253        Ok(())
254    }
255
256    /// Starts provider-owned background work after configuration.
257    async fn start(&self) -> ProviderResult<()> {
258        Ok(())
259    }
260
261    /// Shuts the provider down before the runtime exits.
262    async fn close(&self) -> ProviderResult<()> {
263        Ok(())
264    }
265}
266
267#[async_trait]
268impl<T> pb::s3_server::S3 for Arc<T>
269where
270    T: S3Provider,
271{
272    type ReadObjectStream = <T as pb::s3_server::S3>::ReadObjectStream;
273
274    async fn head_object(
275        &self,
276        request: tonic::Request<pb::HeadObjectRequest>,
277    ) -> std::result::Result<tonic::Response<pb::HeadObjectResponse>, tonic::Status> {
278        <T as pb::s3_server::S3>::head_object(self.as_ref(), request).await
279    }
280
281    async fn read_object(
282        &self,
283        request: tonic::Request<pb::ReadObjectRequest>,
284    ) -> std::result::Result<tonic::Response<Self::ReadObjectStream>, tonic::Status> {
285        <T as pb::s3_server::S3>::read_object(self.as_ref(), request).await
286    }
287
288    async fn write_object(
289        &self,
290        request: tonic::Request<tonic::Streaming<pb::WriteObjectRequest>>,
291    ) -> std::result::Result<tonic::Response<pb::WriteObjectResponse>, tonic::Status> {
292        <T as pb::s3_server::S3>::write_object(self.as_ref(), request).await
293    }
294
295    async fn delete_object(
296        &self,
297        request: tonic::Request<pb::DeleteObjectRequest>,
298    ) -> std::result::Result<tonic::Response<()>, tonic::Status> {
299        <T as pb::s3_server::S3>::delete_object(self.as_ref(), request).await
300    }
301
302    async fn list_objects(
303        &self,
304        request: tonic::Request<pb::ListObjectsRequest>,
305    ) -> std::result::Result<tonic::Response<pb::ListObjectsResponse>, tonic::Status> {
306        <T as pb::s3_server::S3>::list_objects(self.as_ref(), request).await
307    }
308
309    async fn copy_object(
310        &self,
311        request: tonic::Request<pb::CopyObjectRequest>,
312    ) -> std::result::Result<tonic::Response<pb::CopyObjectResponse>, tonic::Status> {
313        <T as pb::s3_server::S3>::copy_object(self.as_ref(), request).await
314    }
315
316    async fn presign_object(
317        &self,
318        request: tonic::Request<pb::PresignObjectRequest>,
319    ) -> std::result::Result<tonic::Response<pb::PresignObjectResponse>, tonic::Status> {
320        <T as pb::s3_server::S3>::presign_object(self.as_ref(), request).await
321    }
322}
323
324/// Client for a running S3 provider.
325pub struct S3 {
326    client: ProtoS3Client<S3Transport>,
327    object_access_client: ProtoS3ObjectAccessClient<S3Transport>,
328}
329
330impl S3 {
331    /// Connects to the default S3 transport socket.
332    pub async fn connect() -> ClientResult<Self> {
333        Self::connect_named("").await
334    }
335
336    /// Connects to a named S3 transport socket.
337    pub async fn connect_named(name: &str) -> ClientResult<Self> {
338        let env_name = s3_socket_env(name);
339        let target =
340            std::env::var(&env_name).map_err(|_| S3Error::Env(format!("{env_name} is not set")))?;
341        let token = std::env::var(s3_socket_token_env(name)).unwrap_or_default();
342
343        let channel = match parse_s3_target(&target)? {
344            S3Target::Unix(path) => {
345                Endpoint::try_from("http://[::]:50051")?
346                    .connect_with_connector(service_fn(move |_: Uri| {
347                        let path = path.clone();
348                        async move {
349                            tokio::net::UnixStream::connect(path)
350                                .await
351                                .map(TokioIo::new)
352                        }
353                    }))
354                    .await?
355            }
356            S3Target::Tcp(address) => {
357                Endpoint::from_shared(format!("http://{address}"))?
358                    .connect()
359                    .await?
360            }
361            S3Target::Tls(address) => {
362                Endpoint::from_shared(format!("https://{address}"))?
363                    .tls_config(ClientTlsConfig::new().with_native_roots())?
364                    .connect()
365                    .await?
366            }
367        };
368
369        let interceptor = relay_token_interceptor(token.trim())?;
370        Ok(Self {
371            client: ProtoS3Client::with_interceptor(channel.clone(), interceptor.clone()),
372            object_access_client: ProtoS3ObjectAccessClient::with_interceptor(channel, interceptor),
373        })
374    }
375
376    /// Returns a convenience handle for one object key.
377    pub fn object(&self, bucket: &str, key: &str) -> Object {
378        Object {
379            client: self.client.clone(),
380            object_access_client: self.object_access_client.clone(),
381            reference: ObjectRef {
382                bucket: bucket.to_string(),
383                key: key.to_string(),
384                version_id: String::new(),
385            },
386        }
387    }
388
389    /// Returns a convenience handle for one object version.
390    pub fn object_version(&self, bucket: &str, key: &str, version_id: &str) -> Object {
391        Object {
392            client: self.client.clone(),
393            object_access_client: self.object_access_client.clone(),
394            reference: ObjectRef {
395                bucket: bucket.to_string(),
396                key: key.to_string(),
397                version_id: version_id.to_string(),
398            },
399        }
400    }
401
402    /// Fetches metadata for one object.
403    pub async fn head_object(&mut self, reference: ObjectRef) -> ClientResult<ObjectMeta> {
404        let response = self
405            .client
406            .head_object(pb::HeadObjectRequest {
407                r#ref: Some(object_ref_to_proto(reference)),
408            })
409            .await
410            .map_err(map_status)?;
411        required_object_meta(
412            response.into_inner().meta,
413            "head object response missing metadata",
414        )
415    }
416
417    /// Opens a streaming object reader.
418    pub async fn read_object(
419        &mut self,
420        reference: ObjectRef,
421        options: Option<ReadOptions>,
422    ) -> ClientResult<ObjectReader> {
423        let options = options.unwrap_or_default();
424        let mut stream = self
425            .client
426            .read_object(pb::ReadObjectRequest {
427                r#ref: Some(object_ref_to_proto(reference)),
428                range: options.range.map(byte_range_to_proto),
429                if_match: options.if_match,
430                if_none_match: options.if_none_match,
431                if_modified_since: options.if_modified_since,
432                if_unmodified_since: options.if_unmodified_since,
433            })
434            .await
435            .map_err(map_status)?
436            .into_inner();
437
438        let first =
439            stream.message().await.map_err(map_status)?.ok_or_else(|| {
440                S3Error::Protocol("read stream ended before metadata".to_string())
441            })?;
442
443        let meta = match first.result {
444            Some(pb::read_object_chunk::Result::Meta(meta)) => object_meta_from_proto(meta),
445            Some(pb::read_object_chunk::Result::Data(_)) => {
446                return Err(S3Error::Protocol(
447                    "read stream started with data instead of metadata".to_string(),
448                ));
449            }
450            None => {
451                return Err(S3Error::Protocol(
452                    "read stream started with an empty frame".to_string(),
453                ));
454            }
455        };
456
457        Ok(ObjectReader { meta, stream })
458    }
459
460    /// Uploads an object from a contiguous byte slice.
461    pub async fn write_object<B>(
462        &mut self,
463        reference: ObjectRef,
464        body: B,
465        options: Option<WriteOptions>,
466    ) -> ClientResult<ObjectMeta>
467    where
468        B: AsRef<[u8]>,
469    {
470        let options = options.unwrap_or_default();
471        let open = pb::WriteObjectRequest {
472            msg: Some(pb::write_object_request::Msg::Open(pb::WriteObjectOpen {
473                r#ref: Some(object_ref_to_proto(reference)),
474                content_type: options.content_type,
475                cache_control: options.cache_control,
476                content_disposition: options.content_disposition,
477                content_encoding: options.content_encoding,
478                content_language: options.content_language,
479                metadata: options.metadata,
480                if_match: options.if_match,
481                if_none_match: options.if_none_match,
482            })),
483        };
484
485        let body = body.as_ref();
486        let data = body
487            .chunks(WRITE_CHUNK_SIZE)
488            .filter(|chunk| !chunk.is_empty())
489            .map(|chunk| pb::WriteObjectRequest {
490                msg: Some(pb::write_object_request::Msg::Data(chunk.to_vec())),
491            })
492            .collect::<Vec<_>>();
493
494        let response = self
495            .client
496            .write_object(iter(std::iter::once(open).chain(data)))
497            .await
498            .map_err(map_status)?;
499        required_object_meta(
500            response.into_inner().meta,
501            "write object response missing metadata",
502        )
503    }
504
505    /// Uploads an object from multiple pre-chunked buffers.
506    pub async fn write_object_chunks<I, B>(
507        &mut self,
508        reference: ObjectRef,
509        chunks: I,
510        options: Option<WriteOptions>,
511    ) -> ClientResult<ObjectMeta>
512    where
513        I: IntoIterator<Item = B>,
514        I::IntoIter: Send + 'static,
515        B: AsRef<[u8]> + Send + 'static,
516    {
517        let options = options.unwrap_or_default();
518        let open = std::iter::once(pb::WriteObjectRequest {
519            msg: Some(pb::write_object_request::Msg::Open(pb::WriteObjectOpen {
520                r#ref: Some(object_ref_to_proto(reference)),
521                content_type: options.content_type,
522                cache_control: options.cache_control,
523                content_disposition: options.content_disposition,
524                content_encoding: options.content_encoding,
525                content_language: options.content_language,
526                metadata: options.metadata,
527                if_match: options.if_match,
528                if_none_match: options.if_none_match,
529            })),
530        });
531
532        let data = chunks.into_iter().filter_map(|chunk| {
533            let bytes = chunk.as_ref();
534            if bytes.is_empty() {
535                return None;
536            }
537            Some(pb::WriteObjectRequest {
538                msg: Some(pb::write_object_request::Msg::Data(bytes.to_vec())),
539            })
540        });
541
542        let response = self
543            .client
544            .write_object(iter(open.chain(data)))
545            .await
546            .map_err(map_status)?;
547        required_object_meta(
548            response.into_inner().meta,
549            "write object response missing metadata",
550        )
551    }
552
553    /// Deletes one object.
554    pub async fn delete_object(&mut self, reference: ObjectRef) -> ClientResult<()> {
555        self.client
556            .delete_object(pb::DeleteObjectRequest {
557                r#ref: Some(object_ref_to_proto(reference)),
558            })
559            .await
560            .map_err(map_status)?;
561        Ok(())
562    }
563
564    /// Lists objects in a bucket.
565    pub async fn list_objects(&mut self, options: ListOptions) -> ClientResult<ListPage> {
566        let response = self
567            .client
568            .list_objects(pb::ListObjectsRequest {
569                bucket: options.bucket,
570                prefix: options.prefix,
571                delimiter: options.delimiter,
572                continuation_token: options.continuation_token,
573                start_after: options.start_after,
574                max_keys: options.max_keys,
575            })
576            .await
577            .map_err(map_status)?;
578        Ok(list_page_from_proto(response.into_inner()))
579    }
580
581    /// Copies one object to another location.
582    pub async fn copy_object(
583        &mut self,
584        source: ObjectRef,
585        destination: ObjectRef,
586        options: Option<CopyOptions>,
587    ) -> ClientResult<ObjectMeta> {
588        let options = options.unwrap_or_default();
589        let response = self
590            .client
591            .copy_object(pb::CopyObjectRequest {
592                source: Some(object_ref_to_proto(source)),
593                destination: Some(object_ref_to_proto(destination)),
594                if_match: options.if_match,
595                if_none_match: options.if_none_match,
596            })
597            .await
598            .map_err(map_status)?;
599        required_object_meta(
600            response.into_inner().meta,
601            "copy object response missing metadata",
602        )
603    }
604
605    /// Creates a provider-generated presigned URL.
606    pub async fn presign_object(
607        &mut self,
608        reference: ObjectRef,
609        options: Option<PresignOptions>,
610    ) -> ClientResult<PresignResult> {
611        let options = options.unwrap_or_default();
612        let expires_seconds = i64::try_from(options.expires.as_secs()).unwrap_or(i64::MAX);
613        let response = self
614            .client
615            .presign_object(pb::PresignObjectRequest {
616                r#ref: Some(object_ref_to_proto(reference)),
617                method: presign_method_to_proto(options.method) as i32,
618                expires_seconds,
619                content_type: options.content_type,
620                content_disposition: options.content_disposition,
621                headers: options.headers,
622            })
623            .await
624            .map_err(map_status)?;
625        Ok(presign_result_from_proto(
626            response.into_inner(),
627            options.method,
628        ))
629    }
630
631    /// Creates a host-mediated object-access URL.
632    pub async fn create_object_access_url(
633        &mut self,
634        reference: ObjectRef,
635        options: Option<ObjectAccessURLOptions>,
636    ) -> ClientResult<ObjectAccessURL> {
637        let options = options.unwrap_or_default();
638        let expires_seconds = i64::try_from(options.expires.as_secs()).unwrap_or(i64::MAX);
639        let response = self
640            .object_access_client
641            .create_object_access_url(pb::CreateObjectAccessUrlRequest {
642                r#ref: Some(object_ref_to_proto(reference)),
643                method: presign_method_to_proto(options.method) as i32,
644                expires_seconds,
645                content_type: options.content_type,
646                content_disposition: options.content_disposition,
647                headers: options.headers,
648            })
649            .await
650            .map_err(map_status)?;
651        Ok(object_access_url_from_proto(
652            response.into_inner(),
653            options.method,
654        ))
655    }
656
657    /// Alias for [`S3::create_object_access_url`].
658    pub async fn create_access_url(
659        &mut self,
660        reference: ObjectRef,
661        options: Option<ObjectAccessURLOptions>,
662    ) -> ClientResult<ObjectAccessURL> {
663        self.create_object_access_url(reference, options).await
664    }
665}
666
667/// Convenience wrapper around repeated operations on one object key.
668pub struct Object {
669    client: ProtoS3Client<S3Transport>,
670    object_access_client: ProtoS3ObjectAccessClient<S3Transport>,
671    reference: ObjectRef,
672}
673
674impl Object {
675    /// Returns the referenced object key and version.
676    pub fn reference(&self) -> &ObjectRef {
677        &self.reference
678    }
679
680    /// Fetches metadata for the current object.
681    pub async fn stat(&mut self) -> ClientResult<ObjectMeta> {
682        let mut client = S3 {
683            client: self.client.clone(),
684            object_access_client: self.object_access_client.clone(),
685        };
686        client.head_object(self.reference.clone()).await
687    }
688
689    /// Reports whether the current object exists.
690    pub async fn exists(&mut self) -> ClientResult<bool> {
691        match self.stat().await {
692            Ok(_) => Ok(true),
693            Err(S3Error::NotFound) => Ok(false),
694            Err(error) => Err(error),
695        }
696    }
697
698    /// Opens a streaming reader for the current object.
699    pub async fn stream(&mut self, options: Option<ReadOptions>) -> ClientResult<ObjectReader> {
700        let mut client = S3 {
701            client: self.client.clone(),
702            object_access_client: self.object_access_client.clone(),
703        };
704        client.read_object(self.reference.clone(), options).await
705    }
706
707    /// Reads the entire object into memory.
708    pub async fn bytes(&mut self, options: Option<ReadOptions>) -> ClientResult<Vec<u8>> {
709        self.stream(options).await?.bytes().await
710    }
711
712    /// Reads the entire object as UTF-8 text.
713    pub async fn text(&mut self, options: Option<ReadOptions>) -> ClientResult<String> {
714        self.stream(options).await?.text().await
715    }
716
717    /// Reads and decodes the entire object as JSON.
718    pub async fn json<T>(&mut self, options: Option<ReadOptions>) -> ClientResult<T>
719    where
720        T: DeserializeOwned,
721    {
722        self.stream(options).await?.json().await
723    }
724
725    /// Uploads a new object body.
726    pub async fn write<B>(
727        &mut self,
728        body: B,
729        options: Option<WriteOptions>,
730    ) -> ClientResult<ObjectMeta>
731    where
732        B: AsRef<[u8]>,
733    {
734        let mut client = S3 {
735            client: self.client.clone(),
736            object_access_client: self.object_access_client.clone(),
737        };
738        client
739            .write_object(self.reference.clone(), body, options)
740            .await
741    }
742
743    /// Uploads a pre-chunked object body.
744    pub async fn write_chunks<I, B>(
745        &mut self,
746        chunks: I,
747        options: Option<WriteOptions>,
748    ) -> ClientResult<ObjectMeta>
749    where
750        I: IntoIterator<Item = B>,
751        I::IntoIter: Send + 'static,
752        B: AsRef<[u8]> + Send + 'static,
753    {
754        let mut client = S3 {
755            client: self.client.clone(),
756            object_access_client: self.object_access_client.clone(),
757        };
758        client
759            .write_object_chunks(self.reference.clone(), chunks, options)
760            .await
761    }
762
763    /// Uploads raw bytes.
764    pub async fn write_bytes(
765        &mut self,
766        body: impl AsRef<[u8]>,
767        options: Option<WriteOptions>,
768    ) -> ClientResult<ObjectMeta> {
769        self.write(body, options).await
770    }
771
772    /// Uploads UTF-8 text.
773    pub async fn write_string(
774        &mut self,
775        body: impl AsRef<str>,
776        options: Option<WriteOptions>,
777    ) -> ClientResult<ObjectMeta> {
778        self.write(body.as_ref().as_bytes(), options).await
779    }
780
781    /// Uploads JSON, defaulting the content type when omitted.
782    pub async fn write_json<T>(
783        &mut self,
784        value: &T,
785        options: Option<WriteOptions>,
786    ) -> ClientResult<ObjectMeta>
787    where
788        T: serde::Serialize + ?Sized,
789    {
790        let body = serde_json::to_vec(value)?;
791        let options = match options {
792            Some(mut options) => {
793                if options.content_type.is_empty() {
794                    options.content_type = "application/json".to_string();
795                }
796                Some(options)
797            }
798            None => Some(WriteOptions {
799                content_type: "application/json".to_string(),
800                ..WriteOptions::default()
801            }),
802        };
803        self.write(body, options).await
804    }
805
806    /// Deletes the current object.
807    pub async fn delete(&mut self) -> ClientResult<()> {
808        let mut client = S3 {
809            client: self.client.clone(),
810            object_access_client: self.object_access_client.clone(),
811        };
812        client.delete_object(self.reference.clone()).await
813    }
814
815    /// Creates a presigned URL for the current object.
816    pub async fn presign(
817        &mut self,
818        options: Option<PresignOptions>,
819    ) -> ClientResult<PresignResult> {
820        let mut client = S3 {
821            client: self.client.clone(),
822            object_access_client: self.object_access_client.clone(),
823        };
824        client.presign_object(self.reference.clone(), options).await
825    }
826
827    /// Creates a host-mediated object-access URL for the current object.
828    pub async fn create_access_url(
829        &mut self,
830        options: Option<ObjectAccessURLOptions>,
831    ) -> ClientResult<ObjectAccessURL> {
832        let mut client = S3 {
833            client: self.client.clone(),
834            object_access_client: self.object_access_client.clone(),
835        };
836        client
837            .create_object_access_url(self.reference.clone(), options)
838            .await
839    }
840}
841
842/// Streaming reader returned by [`S3::read_object`] and [`Object::stream`].
843pub struct ObjectReader {
844    meta: ObjectMeta,
845    stream: tonic::Streaming<pb::ReadObjectChunk>,
846}
847
848impl ObjectReader {
849    /// Returns the metadata frame emitted at the start of the stream.
850    pub fn meta(&self) -> &ObjectMeta {
851        &self.meta
852    }
853
854    /// Returns the next non-empty body chunk.
855    pub async fn next_chunk(&mut self) -> ClientResult<Option<Vec<u8>>> {
856        loop {
857            let Some(message) = self.stream.message().await.map_err(map_status)? else {
858                return Ok(None);
859            };
860
861            match message.result {
862                Some(pb::read_object_chunk::Result::Data(data)) => {
863                    if data.is_empty() {
864                        continue;
865                    }
866                    return Ok(Some(data));
867                }
868                Some(pb::read_object_chunk::Result::Meta(_)) => {
869                    return Err(S3Error::Protocol(
870                        "read stream emitted metadata after the initial frame".to_string(),
871                    ));
872                }
873                None => continue,
874            }
875        }
876    }
877
878    /// Reads the remainder of the stream into memory.
879    pub async fn bytes(mut self) -> ClientResult<Vec<u8>> {
880        let mut body = Vec::new();
881        while let Some(chunk) = self.next_chunk().await? {
882            body.extend_from_slice(&chunk);
883        }
884        Ok(body)
885    }
886
887    /// Reads the remainder of the stream as UTF-8 text.
888    pub async fn text(self) -> ClientResult<String> {
889        Ok(String::from_utf8(self.bytes().await?)?)
890    }
891
892    /// Reads and decodes the remainder of the stream as JSON.
893    pub async fn json<T>(self) -> ClientResult<T>
894    where
895        T: DeserializeOwned,
896    {
897        Ok(serde_json::from_slice(&self.bytes().await?)?)
898    }
899}
900
901/// Returns the environment variable used for a named S3 socket.
902pub fn s3_socket_env(name: &str) -> String {
903    let trimmed = name.trim();
904    if trimmed.is_empty() {
905        return ENV_S3_SOCKET.to_string();
906    }
907    let mut env = String::from(ENV_S3_SOCKET);
908    env.push('_');
909    for ch in trimmed.chars() {
910        if ch.is_ascii_alphanumeric() {
911            env.push(ch.to_ascii_uppercase());
912        } else {
913            env.push('_');
914        }
915    }
916    env
917}
918
919/// Returns the environment variable used for a named S3 relay token.
920pub fn s3_socket_token_env(name: &str) -> String {
921    if name.trim().is_empty() {
922        return ENV_S3_SOCKET_TOKEN.to_string();
923    }
924    format!("{}{}", s3_socket_env(name), ENV_S3_SOCKET_TOKEN_SUFFIX)
925}
926
927enum S3Target {
928    Unix(String),
929    Tcp(String),
930    Tls(String),
931}
932
933fn parse_s3_target(raw_target: &str) -> Result<S3Target, S3Error> {
934    let target = raw_target.trim();
935    if target.is_empty() {
936        return Err(S3Error::Env("S3 transport target is required".to_string()));
937    }
938    if let Some(address) = target.strip_prefix("tcp://") {
939        let address = address.trim();
940        if address.is_empty() {
941            return Err(S3Error::Env(format!(
942                "S3 tcp target {raw_target:?} is missing host:port"
943            )));
944        }
945        return Ok(S3Target::Tcp(address.to_string()));
946    }
947    if let Some(address) = target.strip_prefix("tls://") {
948        let address = address.trim();
949        if address.is_empty() {
950            return Err(S3Error::Env(format!(
951                "S3 tls target {raw_target:?} is missing host:port"
952            )));
953        }
954        return Ok(S3Target::Tls(address.to_string()));
955    }
956    if let Some(path) = target.strip_prefix("unix://") {
957        let path = path.trim();
958        if path.is_empty() {
959            return Err(S3Error::Env(format!(
960                "S3 unix target {raw_target:?} is missing a socket path"
961            )));
962        }
963        return Ok(S3Target::Unix(path.to_string()));
964    }
965    if target.contains("://") {
966        let scheme = target.split("://").next().unwrap_or_default();
967        return Err(S3Error::Env(format!(
968            "unsupported S3 target scheme {scheme:?}"
969        )));
970    }
971    Ok(S3Target::Unix(target.to_string()))
972}
973
974fn relay_token_interceptor(token: &str) -> Result<RelayTokenInterceptor, S3Error> {
975    let header = if token.trim().is_empty() {
976        None
977    } else {
978        Some(
979            MetadataValue::try_from(token.to_string())
980                .map_err(|err| S3Error::Env(format!("invalid S3 relay token metadata: {err}")))?,
981        )
982    };
983    Ok(RelayTokenInterceptor { header })
984}
985
986#[derive(Clone)]
987struct RelayTokenInterceptor {
988    header: Option<MetadataValue<tonic::metadata::Ascii>>,
989}
990
991impl Interceptor for RelayTokenInterceptor {
992    fn call(
993        &mut self,
994        mut request: tonic::Request<()>,
995    ) -> std::result::Result<tonic::Request<()>, tonic::Status> {
996        if let Some(header) = self.header.clone() {
997            request.metadata_mut().insert(S3_RELAY_TOKEN_HEADER, header);
998        }
999        Ok(request)
1000    }
1001}
1002
1003fn map_status(err: tonic::Status) -> S3Error {
1004    match err.code() {
1005        tonic::Code::NotFound => S3Error::NotFound,
1006        tonic::Code::FailedPrecondition => S3Error::PreconditionFailed,
1007        tonic::Code::OutOfRange => S3Error::InvalidRange,
1008        _ => S3Error::Status(err),
1009    }
1010}
1011
1012fn object_ref_to_proto(reference: ObjectRef) -> pb::S3ObjectRef {
1013    pb::S3ObjectRef {
1014        bucket: reference.bucket,
1015        key: reference.key,
1016        version_id: reference.version_id,
1017    }
1018}
1019
1020fn object_meta_from_proto(meta: pb::S3ObjectMeta) -> ObjectMeta {
1021    ObjectMeta {
1022        reference: meta
1023            .r#ref
1024            .map(|reference| ObjectRef {
1025                bucket: reference.bucket,
1026                key: reference.key,
1027                version_id: reference.version_id,
1028            })
1029            .unwrap_or_default(),
1030        etag: meta.etag,
1031        size: meta.size,
1032        content_type: meta.content_type,
1033        last_modified: meta.last_modified,
1034        metadata: meta.metadata,
1035        storage_class: meta.storage_class,
1036    }
1037}
1038
1039fn required_object_meta(meta: Option<pb::S3ObjectMeta>, context: &str) -> ClientResult<ObjectMeta> {
1040    let meta = meta.ok_or_else(|| S3Error::Protocol(context.to_string()))?;
1041    Ok(object_meta_from_proto(meta))
1042}
1043
1044fn byte_range_to_proto(range: ByteRange) -> pb::ByteRange {
1045    pb::ByteRange {
1046        start: range.start,
1047        end: range.end,
1048    }
1049}
1050
1051fn list_page_from_proto(page: pb::ListObjectsResponse) -> ListPage {
1052    ListPage {
1053        objects: page
1054            .objects
1055            .into_iter()
1056            .map(object_meta_from_proto)
1057            .collect(),
1058        common_prefixes: page.common_prefixes,
1059        next_continuation_token: page.next_continuation_token,
1060        has_more: page.has_more,
1061    }
1062}
1063
1064fn presign_method_to_proto(method: PresignMethod) -> pb::PresignMethod {
1065    match method {
1066        PresignMethod::Unspecified => pb::PresignMethod::Unspecified,
1067        PresignMethod::Get => pb::PresignMethod::Get,
1068        PresignMethod::Put => pb::PresignMethod::Put,
1069        PresignMethod::Delete => pb::PresignMethod::Delete,
1070        PresignMethod::Head => pb::PresignMethod::Head,
1071    }
1072}
1073
1074fn presign_method_from_proto(method: i32) -> PresignMethod {
1075    match pb::PresignMethod::try_from(method).unwrap_or(pb::PresignMethod::Unspecified) {
1076        pb::PresignMethod::Get => PresignMethod::Get,
1077        pb::PresignMethod::Put => PresignMethod::Put,
1078        pb::PresignMethod::Delete => PresignMethod::Delete,
1079        pb::PresignMethod::Head => PresignMethod::Head,
1080        pb::PresignMethod::Unspecified => PresignMethod::Unspecified,
1081    }
1082}
1083
1084fn presign_result_from_proto(
1085    result: pb::PresignObjectResponse,
1086    requested_method: PresignMethod,
1087) -> PresignResult {
1088    let method = presign_method_from_proto(result.method);
1089    PresignResult {
1090        url: result.url,
1091        method: if method == PresignMethod::Unspecified {
1092            requested_method
1093        } else {
1094            method
1095        },
1096        expires_at: result.expires_at,
1097        headers: result.headers,
1098    }
1099}
1100
1101fn object_access_url_from_proto(
1102    result: pb::CreateObjectAccessUrlResponse,
1103    requested_method: PresignMethod,
1104) -> ObjectAccessURL {
1105    let method = presign_method_from_proto(result.method);
1106    ObjectAccessURL {
1107        url: result.url,
1108        method: if method == PresignMethod::Unspecified {
1109            requested_method
1110        } else {
1111            method
1112        },
1113        expires_at: result.expires_at,
1114        headers: result.headers,
1115    }
1116}