1use std::collections::BTreeMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use hyper_util::rt::TokioIo;
6use serde::de::DeserializeOwned;
7use tokio_stream::iter;
8use tonic::codegen::async_trait;
9use tonic::metadata::MetadataValue;
10use tonic::service::Interceptor;
11use tonic::service::interceptor::InterceptedService;
12use tonic::transport::{Channel, ClientTlsConfig, Endpoint, Uri};
13use tower::service_fn;
14
15use crate::api::RuntimeMetadata;
16use crate::error::Result as ProviderResult;
17use crate::generated::v1::{
18 self as pb, s3_client::S3Client as ProtoS3Client,
19 s3_object_access_client::S3ObjectAccessClient as ProtoS3ObjectAccessClient,
20};
21
22type ClientResult<T> = std::result::Result<T, S3Error>;
23type S3Transport = InterceptedService<Channel, RelayTokenInterceptor>;
24
25pub const ENV_S3_SOCKET: &str = "GESTALT_S3_SOCKET";
27pub const ENV_S3_SOCKET_TOKEN_SUFFIX: &str = "_TOKEN";
29pub const ENV_S3_SOCKET_TOKEN: &str = "GESTALT_S3_SOCKET_TOKEN";
31const S3_RELAY_TOKEN_HEADER: &str = "x-gestalt-host-service-relay-token";
32const WRITE_CHUNK_SIZE: usize = 64 * 1024;
33
34#[derive(Debug, thiserror::Error)]
35pub enum S3Error {
37 #[error("not found")]
39 NotFound,
40 #[error("precondition failed")]
42 PreconditionFailed,
43 #[error("invalid range")]
45 InvalidRange,
46 #[error("{0}")]
48 Protocol(String),
49 #[error("{0}")]
51 Transport(#[from] tonic::transport::Error),
52 #[error("{0}")]
54 Status(#[from] tonic::Status),
55 #[error("{0}")]
57 Env(String),
58 #[error("{0}")]
60 Json(#[from] serde_json::Error),
61 #[error("{0}")]
63 Utf8(#[from] std::string::FromUtf8Error),
64}
65
66#[derive(Clone, Debug, Default, Eq, PartialEq)]
67pub struct ObjectRef {
69 pub bucket: String,
71 pub key: String,
73 pub version_id: String,
75}
76
77#[derive(Clone, Debug, Default, PartialEq)]
78pub struct ObjectMeta {
80 pub reference: ObjectRef,
82 pub etag: String,
84 pub size: i64,
86 pub content_type: String,
88 pub last_modified: Option<prost_types::Timestamp>,
90 pub metadata: BTreeMap<String, String>,
92 pub storage_class: String,
94}
95
96#[derive(Clone, Debug, Default, Eq, PartialEq)]
97pub struct ByteRange {
99 pub start: Option<i64>,
101 pub end: Option<i64>,
103}
104
105#[derive(Clone, Debug, Default, PartialEq)]
106pub struct ReadOptions {
108 pub range: Option<ByteRange>,
110 pub if_match: String,
112 pub if_none_match: String,
114 pub if_modified_since: Option<prost_types::Timestamp>,
116 pub if_unmodified_since: Option<prost_types::Timestamp>,
118}
119
120#[derive(Clone, Debug, Default, Eq, PartialEq)]
121pub struct WriteOptions {
123 pub content_type: String,
125 pub cache_control: String,
127 pub content_disposition: String,
129 pub content_encoding: String,
131 pub content_language: String,
133 pub metadata: BTreeMap<String, String>,
135 pub if_match: String,
137 pub if_none_match: String,
139}
140
141#[derive(Clone, Debug, Default, Eq, PartialEq)]
142pub struct ListOptions {
144 pub bucket: String,
146 pub prefix: String,
148 pub delimiter: String,
150 pub continuation_token: String,
152 pub start_after: String,
154 pub max_keys: i32,
156}
157
158#[derive(Clone, Debug, Default, PartialEq)]
159pub struct ListPage {
161 pub objects: Vec<ObjectMeta>,
163 pub common_prefixes: Vec<String>,
165 pub next_continuation_token: String,
167 pub has_more: bool,
169}
170
171#[derive(Clone, Debug, Default, Eq, PartialEq)]
172pub struct CopyOptions {
174 pub if_match: String,
176 pub if_none_match: String,
178}
179
180#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
181pub enum PresignMethod {
183 #[default]
185 Unspecified,
186 Get,
188 Put,
190 Delete,
192 Head,
194}
195
196#[derive(Clone, Debug, Default, Eq, PartialEq)]
197pub struct PresignOptions {
199 pub method: PresignMethod,
201 pub expires: Duration,
203 pub content_type: String,
205 pub content_disposition: String,
207 pub headers: BTreeMap<String, String>,
209}
210
211#[derive(Clone, Debug, Default, PartialEq)]
212pub struct PresignResult {
214 pub url: String,
216 pub method: PresignMethod,
218 pub expires_at: Option<prost_types::Timestamp>,
220 pub headers: BTreeMap<String, String>,
222}
223
224pub type ObjectAccessURLOptions = PresignOptions;
226pub type ObjectAccessURL = PresignResult;
228
229#[async_trait]
230pub trait S3Provider: pb::s3_server::S3 + Send + Sync + 'static {
232 async fn configure(
234 &self,
235 _name: &str,
236 _config: serde_json::Map<String, serde_json::Value>,
237 ) -> ProviderResult<()> {
238 Ok(())
239 }
240
241 fn metadata(&self) -> Option<RuntimeMetadata> {
243 None
244 }
245
246 fn warnings(&self) -> Vec<String> {
248 Vec::new()
249 }
250
251 async fn health_check(&self) -> ProviderResult<()> {
253 Ok(())
254 }
255
256 async fn start(&self) -> ProviderResult<()> {
258 Ok(())
259 }
260
261 async fn close(&self) -> ProviderResult<()> {
263 Ok(())
264 }
265}
266
267#[async_trait]
268impl<T> pb::s3_server::S3 for Arc<T>
269where
270 T: S3Provider,
271{
272 type ReadObjectStream = <T as pb::s3_server::S3>::ReadObjectStream;
273
274 async fn head_object(
275 &self,
276 request: tonic::Request<pb::HeadObjectRequest>,
277 ) -> std::result::Result<tonic::Response<pb::HeadObjectResponse>, tonic::Status> {
278 <T as pb::s3_server::S3>::head_object(self.as_ref(), request).await
279 }
280
281 async fn read_object(
282 &self,
283 request: tonic::Request<pb::ReadObjectRequest>,
284 ) -> std::result::Result<tonic::Response<Self::ReadObjectStream>, tonic::Status> {
285 <T as pb::s3_server::S3>::read_object(self.as_ref(), request).await
286 }
287
288 async fn write_object(
289 &self,
290 request: tonic::Request<tonic::Streaming<pb::WriteObjectRequest>>,
291 ) -> std::result::Result<tonic::Response<pb::WriteObjectResponse>, tonic::Status> {
292 <T as pb::s3_server::S3>::write_object(self.as_ref(), request).await
293 }
294
295 async fn delete_object(
296 &self,
297 request: tonic::Request<pb::DeleteObjectRequest>,
298 ) -> std::result::Result<tonic::Response<()>, tonic::Status> {
299 <T as pb::s3_server::S3>::delete_object(self.as_ref(), request).await
300 }
301
302 async fn list_objects(
303 &self,
304 request: tonic::Request<pb::ListObjectsRequest>,
305 ) -> std::result::Result<tonic::Response<pb::ListObjectsResponse>, tonic::Status> {
306 <T as pb::s3_server::S3>::list_objects(self.as_ref(), request).await
307 }
308
309 async fn copy_object(
310 &self,
311 request: tonic::Request<pb::CopyObjectRequest>,
312 ) -> std::result::Result<tonic::Response<pb::CopyObjectResponse>, tonic::Status> {
313 <T as pb::s3_server::S3>::copy_object(self.as_ref(), request).await
314 }
315
316 async fn presign_object(
317 &self,
318 request: tonic::Request<pb::PresignObjectRequest>,
319 ) -> std::result::Result<tonic::Response<pb::PresignObjectResponse>, tonic::Status> {
320 <T as pb::s3_server::S3>::presign_object(self.as_ref(), request).await
321 }
322}
323
324pub struct S3 {
326 client: ProtoS3Client<S3Transport>,
327 object_access_client: ProtoS3ObjectAccessClient<S3Transport>,
328}
329
330impl S3 {
331 pub async fn connect() -> ClientResult<Self> {
333 Self::connect_named("").await
334 }
335
336 pub async fn connect_named(name: &str) -> ClientResult<Self> {
338 let env_name = s3_socket_env(name);
339 let target =
340 std::env::var(&env_name).map_err(|_| S3Error::Env(format!("{env_name} is not set")))?;
341 let token = std::env::var(s3_socket_token_env(name)).unwrap_or_default();
342
343 let channel = match parse_s3_target(&target)? {
344 S3Target::Unix(path) => {
345 Endpoint::try_from("http://[::]:50051")?
346 .connect_with_connector(service_fn(move |_: Uri| {
347 let path = path.clone();
348 async move {
349 tokio::net::UnixStream::connect(path)
350 .await
351 .map(TokioIo::new)
352 }
353 }))
354 .await?
355 }
356 S3Target::Tcp(address) => {
357 Endpoint::from_shared(format!("http://{address}"))?
358 .connect()
359 .await?
360 }
361 S3Target::Tls(address) => {
362 Endpoint::from_shared(format!("https://{address}"))?
363 .tls_config(ClientTlsConfig::new().with_native_roots())?
364 .connect()
365 .await?
366 }
367 };
368
369 let interceptor = relay_token_interceptor(token.trim())?;
370 Ok(Self {
371 client: ProtoS3Client::with_interceptor(channel.clone(), interceptor.clone()),
372 object_access_client: ProtoS3ObjectAccessClient::with_interceptor(channel, interceptor),
373 })
374 }
375
376 pub fn object(&self, bucket: &str, key: &str) -> Object {
378 Object {
379 client: self.client.clone(),
380 object_access_client: self.object_access_client.clone(),
381 reference: ObjectRef {
382 bucket: bucket.to_string(),
383 key: key.to_string(),
384 version_id: String::new(),
385 },
386 }
387 }
388
389 pub fn object_version(&self, bucket: &str, key: &str, version_id: &str) -> Object {
391 Object {
392 client: self.client.clone(),
393 object_access_client: self.object_access_client.clone(),
394 reference: ObjectRef {
395 bucket: bucket.to_string(),
396 key: key.to_string(),
397 version_id: version_id.to_string(),
398 },
399 }
400 }
401
402 pub async fn head_object(&mut self, reference: ObjectRef) -> ClientResult<ObjectMeta> {
404 let response = self
405 .client
406 .head_object(pb::HeadObjectRequest {
407 r#ref: Some(object_ref_to_proto(reference)),
408 })
409 .await
410 .map_err(map_status)?;
411 required_object_meta(
412 response.into_inner().meta,
413 "head object response missing metadata",
414 )
415 }
416
417 pub async fn read_object(
419 &mut self,
420 reference: ObjectRef,
421 options: Option<ReadOptions>,
422 ) -> ClientResult<ObjectReader> {
423 let options = options.unwrap_or_default();
424 let mut stream = self
425 .client
426 .read_object(pb::ReadObjectRequest {
427 r#ref: Some(object_ref_to_proto(reference)),
428 range: options.range.map(byte_range_to_proto),
429 if_match: options.if_match,
430 if_none_match: options.if_none_match,
431 if_modified_since: options.if_modified_since,
432 if_unmodified_since: options.if_unmodified_since,
433 })
434 .await
435 .map_err(map_status)?
436 .into_inner();
437
438 let first =
439 stream.message().await.map_err(map_status)?.ok_or_else(|| {
440 S3Error::Protocol("read stream ended before metadata".to_string())
441 })?;
442
443 let meta = match first.result {
444 Some(pb::read_object_chunk::Result::Meta(meta)) => object_meta_from_proto(meta),
445 Some(pb::read_object_chunk::Result::Data(_)) => {
446 return Err(S3Error::Protocol(
447 "read stream started with data instead of metadata".to_string(),
448 ));
449 }
450 None => {
451 return Err(S3Error::Protocol(
452 "read stream started with an empty frame".to_string(),
453 ));
454 }
455 };
456
457 Ok(ObjectReader { meta, stream })
458 }
459
460 pub async fn write_object<B>(
462 &mut self,
463 reference: ObjectRef,
464 body: B,
465 options: Option<WriteOptions>,
466 ) -> ClientResult<ObjectMeta>
467 where
468 B: AsRef<[u8]>,
469 {
470 let options = options.unwrap_or_default();
471 let open = pb::WriteObjectRequest {
472 msg: Some(pb::write_object_request::Msg::Open(pb::WriteObjectOpen {
473 r#ref: Some(object_ref_to_proto(reference)),
474 content_type: options.content_type,
475 cache_control: options.cache_control,
476 content_disposition: options.content_disposition,
477 content_encoding: options.content_encoding,
478 content_language: options.content_language,
479 metadata: options.metadata,
480 if_match: options.if_match,
481 if_none_match: options.if_none_match,
482 })),
483 };
484
485 let body = body.as_ref();
486 let data = body
487 .chunks(WRITE_CHUNK_SIZE)
488 .filter(|chunk| !chunk.is_empty())
489 .map(|chunk| pb::WriteObjectRequest {
490 msg: Some(pb::write_object_request::Msg::Data(chunk.to_vec())),
491 })
492 .collect::<Vec<_>>();
493
494 let response = self
495 .client
496 .write_object(iter(std::iter::once(open).chain(data)))
497 .await
498 .map_err(map_status)?;
499 required_object_meta(
500 response.into_inner().meta,
501 "write object response missing metadata",
502 )
503 }
504
505 pub async fn write_object_chunks<I, B>(
507 &mut self,
508 reference: ObjectRef,
509 chunks: I,
510 options: Option<WriteOptions>,
511 ) -> ClientResult<ObjectMeta>
512 where
513 I: IntoIterator<Item = B>,
514 I::IntoIter: Send + 'static,
515 B: AsRef<[u8]> + Send + 'static,
516 {
517 let options = options.unwrap_or_default();
518 let open = std::iter::once(pb::WriteObjectRequest {
519 msg: Some(pb::write_object_request::Msg::Open(pb::WriteObjectOpen {
520 r#ref: Some(object_ref_to_proto(reference)),
521 content_type: options.content_type,
522 cache_control: options.cache_control,
523 content_disposition: options.content_disposition,
524 content_encoding: options.content_encoding,
525 content_language: options.content_language,
526 metadata: options.metadata,
527 if_match: options.if_match,
528 if_none_match: options.if_none_match,
529 })),
530 });
531
532 let data = chunks.into_iter().filter_map(|chunk| {
533 let bytes = chunk.as_ref();
534 if bytes.is_empty() {
535 return None;
536 }
537 Some(pb::WriteObjectRequest {
538 msg: Some(pb::write_object_request::Msg::Data(bytes.to_vec())),
539 })
540 });
541
542 let response = self
543 .client
544 .write_object(iter(open.chain(data)))
545 .await
546 .map_err(map_status)?;
547 required_object_meta(
548 response.into_inner().meta,
549 "write object response missing metadata",
550 )
551 }
552
553 pub async fn delete_object(&mut self, reference: ObjectRef) -> ClientResult<()> {
555 self.client
556 .delete_object(pb::DeleteObjectRequest {
557 r#ref: Some(object_ref_to_proto(reference)),
558 })
559 .await
560 .map_err(map_status)?;
561 Ok(())
562 }
563
564 pub async fn list_objects(&mut self, options: ListOptions) -> ClientResult<ListPage> {
566 let response = self
567 .client
568 .list_objects(pb::ListObjectsRequest {
569 bucket: options.bucket,
570 prefix: options.prefix,
571 delimiter: options.delimiter,
572 continuation_token: options.continuation_token,
573 start_after: options.start_after,
574 max_keys: options.max_keys,
575 })
576 .await
577 .map_err(map_status)?;
578 Ok(list_page_from_proto(response.into_inner()))
579 }
580
581 pub async fn copy_object(
583 &mut self,
584 source: ObjectRef,
585 destination: ObjectRef,
586 options: Option<CopyOptions>,
587 ) -> ClientResult<ObjectMeta> {
588 let options = options.unwrap_or_default();
589 let response = self
590 .client
591 .copy_object(pb::CopyObjectRequest {
592 source: Some(object_ref_to_proto(source)),
593 destination: Some(object_ref_to_proto(destination)),
594 if_match: options.if_match,
595 if_none_match: options.if_none_match,
596 })
597 .await
598 .map_err(map_status)?;
599 required_object_meta(
600 response.into_inner().meta,
601 "copy object response missing metadata",
602 )
603 }
604
605 pub async fn presign_object(
607 &mut self,
608 reference: ObjectRef,
609 options: Option<PresignOptions>,
610 ) -> ClientResult<PresignResult> {
611 let options = options.unwrap_or_default();
612 let expires_seconds = i64::try_from(options.expires.as_secs()).unwrap_or(i64::MAX);
613 let response = self
614 .client
615 .presign_object(pb::PresignObjectRequest {
616 r#ref: Some(object_ref_to_proto(reference)),
617 method: presign_method_to_proto(options.method) as i32,
618 expires_seconds,
619 content_type: options.content_type,
620 content_disposition: options.content_disposition,
621 headers: options.headers,
622 })
623 .await
624 .map_err(map_status)?;
625 Ok(presign_result_from_proto(
626 response.into_inner(),
627 options.method,
628 ))
629 }
630
631 pub async fn create_object_access_url(
633 &mut self,
634 reference: ObjectRef,
635 options: Option<ObjectAccessURLOptions>,
636 ) -> ClientResult<ObjectAccessURL> {
637 let options = options.unwrap_or_default();
638 let expires_seconds = i64::try_from(options.expires.as_secs()).unwrap_or(i64::MAX);
639 let response = self
640 .object_access_client
641 .create_object_access_url(pb::CreateObjectAccessUrlRequest {
642 r#ref: Some(object_ref_to_proto(reference)),
643 method: presign_method_to_proto(options.method) as i32,
644 expires_seconds,
645 content_type: options.content_type,
646 content_disposition: options.content_disposition,
647 headers: options.headers,
648 })
649 .await
650 .map_err(map_status)?;
651 Ok(object_access_url_from_proto(
652 response.into_inner(),
653 options.method,
654 ))
655 }
656
657 pub async fn create_access_url(
659 &mut self,
660 reference: ObjectRef,
661 options: Option<ObjectAccessURLOptions>,
662 ) -> ClientResult<ObjectAccessURL> {
663 self.create_object_access_url(reference, options).await
664 }
665}
666
667pub struct Object {
669 client: ProtoS3Client<S3Transport>,
670 object_access_client: ProtoS3ObjectAccessClient<S3Transport>,
671 reference: ObjectRef,
672}
673
674impl Object {
675 pub fn reference(&self) -> &ObjectRef {
677 &self.reference
678 }
679
680 pub async fn stat(&mut self) -> ClientResult<ObjectMeta> {
682 let mut client = S3 {
683 client: self.client.clone(),
684 object_access_client: self.object_access_client.clone(),
685 };
686 client.head_object(self.reference.clone()).await
687 }
688
689 pub async fn exists(&mut self) -> ClientResult<bool> {
691 match self.stat().await {
692 Ok(_) => Ok(true),
693 Err(S3Error::NotFound) => Ok(false),
694 Err(error) => Err(error),
695 }
696 }
697
698 pub async fn stream(&mut self, options: Option<ReadOptions>) -> ClientResult<ObjectReader> {
700 let mut client = S3 {
701 client: self.client.clone(),
702 object_access_client: self.object_access_client.clone(),
703 };
704 client.read_object(self.reference.clone(), options).await
705 }
706
707 pub async fn bytes(&mut self, options: Option<ReadOptions>) -> ClientResult<Vec<u8>> {
709 self.stream(options).await?.bytes().await
710 }
711
712 pub async fn text(&mut self, options: Option<ReadOptions>) -> ClientResult<String> {
714 self.stream(options).await?.text().await
715 }
716
717 pub async fn json<T>(&mut self, options: Option<ReadOptions>) -> ClientResult<T>
719 where
720 T: DeserializeOwned,
721 {
722 self.stream(options).await?.json().await
723 }
724
725 pub async fn write<B>(
727 &mut self,
728 body: B,
729 options: Option<WriteOptions>,
730 ) -> ClientResult<ObjectMeta>
731 where
732 B: AsRef<[u8]>,
733 {
734 let mut client = S3 {
735 client: self.client.clone(),
736 object_access_client: self.object_access_client.clone(),
737 };
738 client
739 .write_object(self.reference.clone(), body, options)
740 .await
741 }
742
743 pub async fn write_chunks<I, B>(
745 &mut self,
746 chunks: I,
747 options: Option<WriteOptions>,
748 ) -> ClientResult<ObjectMeta>
749 where
750 I: IntoIterator<Item = B>,
751 I::IntoIter: Send + 'static,
752 B: AsRef<[u8]> + Send + 'static,
753 {
754 let mut client = S3 {
755 client: self.client.clone(),
756 object_access_client: self.object_access_client.clone(),
757 };
758 client
759 .write_object_chunks(self.reference.clone(), chunks, options)
760 .await
761 }
762
763 pub async fn write_bytes(
765 &mut self,
766 body: impl AsRef<[u8]>,
767 options: Option<WriteOptions>,
768 ) -> ClientResult<ObjectMeta> {
769 self.write(body, options).await
770 }
771
772 pub async fn write_string(
774 &mut self,
775 body: impl AsRef<str>,
776 options: Option<WriteOptions>,
777 ) -> ClientResult<ObjectMeta> {
778 self.write(body.as_ref().as_bytes(), options).await
779 }
780
781 pub async fn write_json<T>(
783 &mut self,
784 value: &T,
785 options: Option<WriteOptions>,
786 ) -> ClientResult<ObjectMeta>
787 where
788 T: serde::Serialize + ?Sized,
789 {
790 let body = serde_json::to_vec(value)?;
791 let options = match options {
792 Some(mut options) => {
793 if options.content_type.is_empty() {
794 options.content_type = "application/json".to_string();
795 }
796 Some(options)
797 }
798 None => Some(WriteOptions {
799 content_type: "application/json".to_string(),
800 ..WriteOptions::default()
801 }),
802 };
803 self.write(body, options).await
804 }
805
806 pub async fn delete(&mut self) -> ClientResult<()> {
808 let mut client = S3 {
809 client: self.client.clone(),
810 object_access_client: self.object_access_client.clone(),
811 };
812 client.delete_object(self.reference.clone()).await
813 }
814
815 pub async fn presign(
817 &mut self,
818 options: Option<PresignOptions>,
819 ) -> ClientResult<PresignResult> {
820 let mut client = S3 {
821 client: self.client.clone(),
822 object_access_client: self.object_access_client.clone(),
823 };
824 client.presign_object(self.reference.clone(), options).await
825 }
826
827 pub async fn create_access_url(
829 &mut self,
830 options: Option<ObjectAccessURLOptions>,
831 ) -> ClientResult<ObjectAccessURL> {
832 let mut client = S3 {
833 client: self.client.clone(),
834 object_access_client: self.object_access_client.clone(),
835 };
836 client
837 .create_object_access_url(self.reference.clone(), options)
838 .await
839 }
840}
841
842pub struct ObjectReader {
844 meta: ObjectMeta,
845 stream: tonic::Streaming<pb::ReadObjectChunk>,
846}
847
848impl ObjectReader {
849 pub fn meta(&self) -> &ObjectMeta {
851 &self.meta
852 }
853
854 pub async fn next_chunk(&mut self) -> ClientResult<Option<Vec<u8>>> {
856 loop {
857 let Some(message) = self.stream.message().await.map_err(map_status)? else {
858 return Ok(None);
859 };
860
861 match message.result {
862 Some(pb::read_object_chunk::Result::Data(data)) => {
863 if data.is_empty() {
864 continue;
865 }
866 return Ok(Some(data));
867 }
868 Some(pb::read_object_chunk::Result::Meta(_)) => {
869 return Err(S3Error::Protocol(
870 "read stream emitted metadata after the initial frame".to_string(),
871 ));
872 }
873 None => continue,
874 }
875 }
876 }
877
878 pub async fn bytes(mut self) -> ClientResult<Vec<u8>> {
880 let mut body = Vec::new();
881 while let Some(chunk) = self.next_chunk().await? {
882 body.extend_from_slice(&chunk);
883 }
884 Ok(body)
885 }
886
887 pub async fn text(self) -> ClientResult<String> {
889 Ok(String::from_utf8(self.bytes().await?)?)
890 }
891
892 pub async fn json<T>(self) -> ClientResult<T>
894 where
895 T: DeserializeOwned,
896 {
897 Ok(serde_json::from_slice(&self.bytes().await?)?)
898 }
899}
900
901pub fn s3_socket_env(name: &str) -> String {
903 let trimmed = name.trim();
904 if trimmed.is_empty() {
905 return ENV_S3_SOCKET.to_string();
906 }
907 let mut env = String::from(ENV_S3_SOCKET);
908 env.push('_');
909 for ch in trimmed.chars() {
910 if ch.is_ascii_alphanumeric() {
911 env.push(ch.to_ascii_uppercase());
912 } else {
913 env.push('_');
914 }
915 }
916 env
917}
918
919pub fn s3_socket_token_env(name: &str) -> String {
921 if name.trim().is_empty() {
922 return ENV_S3_SOCKET_TOKEN.to_string();
923 }
924 format!("{}{}", s3_socket_env(name), ENV_S3_SOCKET_TOKEN_SUFFIX)
925}
926
927enum S3Target {
928 Unix(String),
929 Tcp(String),
930 Tls(String),
931}
932
933fn parse_s3_target(raw_target: &str) -> Result<S3Target, S3Error> {
934 let target = raw_target.trim();
935 if target.is_empty() {
936 return Err(S3Error::Env("S3 transport target is required".to_string()));
937 }
938 if let Some(address) = target.strip_prefix("tcp://") {
939 let address = address.trim();
940 if address.is_empty() {
941 return Err(S3Error::Env(format!(
942 "S3 tcp target {raw_target:?} is missing host:port"
943 )));
944 }
945 return Ok(S3Target::Tcp(address.to_string()));
946 }
947 if let Some(address) = target.strip_prefix("tls://") {
948 let address = address.trim();
949 if address.is_empty() {
950 return Err(S3Error::Env(format!(
951 "S3 tls target {raw_target:?} is missing host:port"
952 )));
953 }
954 return Ok(S3Target::Tls(address.to_string()));
955 }
956 if let Some(path) = target.strip_prefix("unix://") {
957 let path = path.trim();
958 if path.is_empty() {
959 return Err(S3Error::Env(format!(
960 "S3 unix target {raw_target:?} is missing a socket path"
961 )));
962 }
963 return Ok(S3Target::Unix(path.to_string()));
964 }
965 if target.contains("://") {
966 let scheme = target.split("://").next().unwrap_or_default();
967 return Err(S3Error::Env(format!(
968 "unsupported S3 target scheme {scheme:?}"
969 )));
970 }
971 Ok(S3Target::Unix(target.to_string()))
972}
973
974fn relay_token_interceptor(token: &str) -> Result<RelayTokenInterceptor, S3Error> {
975 let header = if token.trim().is_empty() {
976 None
977 } else {
978 Some(
979 MetadataValue::try_from(token.to_string())
980 .map_err(|err| S3Error::Env(format!("invalid S3 relay token metadata: {err}")))?,
981 )
982 };
983 Ok(RelayTokenInterceptor { header })
984}
985
986#[derive(Clone)]
987struct RelayTokenInterceptor {
988 header: Option<MetadataValue<tonic::metadata::Ascii>>,
989}
990
991impl Interceptor for RelayTokenInterceptor {
992 fn call(
993 &mut self,
994 mut request: tonic::Request<()>,
995 ) -> std::result::Result<tonic::Request<()>, tonic::Status> {
996 if let Some(header) = self.header.clone() {
997 request.metadata_mut().insert(S3_RELAY_TOKEN_HEADER, header);
998 }
999 Ok(request)
1000 }
1001}
1002
1003fn map_status(err: tonic::Status) -> S3Error {
1004 match err.code() {
1005 tonic::Code::NotFound => S3Error::NotFound,
1006 tonic::Code::FailedPrecondition => S3Error::PreconditionFailed,
1007 tonic::Code::OutOfRange => S3Error::InvalidRange,
1008 _ => S3Error::Status(err),
1009 }
1010}
1011
1012fn object_ref_to_proto(reference: ObjectRef) -> pb::S3ObjectRef {
1013 pb::S3ObjectRef {
1014 bucket: reference.bucket,
1015 key: reference.key,
1016 version_id: reference.version_id,
1017 }
1018}
1019
1020fn object_meta_from_proto(meta: pb::S3ObjectMeta) -> ObjectMeta {
1021 ObjectMeta {
1022 reference: meta
1023 .r#ref
1024 .map(|reference| ObjectRef {
1025 bucket: reference.bucket,
1026 key: reference.key,
1027 version_id: reference.version_id,
1028 })
1029 .unwrap_or_default(),
1030 etag: meta.etag,
1031 size: meta.size,
1032 content_type: meta.content_type,
1033 last_modified: meta.last_modified,
1034 metadata: meta.metadata,
1035 storage_class: meta.storage_class,
1036 }
1037}
1038
1039fn required_object_meta(meta: Option<pb::S3ObjectMeta>, context: &str) -> ClientResult<ObjectMeta> {
1040 let meta = meta.ok_or_else(|| S3Error::Protocol(context.to_string()))?;
1041 Ok(object_meta_from_proto(meta))
1042}
1043
1044fn byte_range_to_proto(range: ByteRange) -> pb::ByteRange {
1045 pb::ByteRange {
1046 start: range.start,
1047 end: range.end,
1048 }
1049}
1050
1051fn list_page_from_proto(page: pb::ListObjectsResponse) -> ListPage {
1052 ListPage {
1053 objects: page
1054 .objects
1055 .into_iter()
1056 .map(object_meta_from_proto)
1057 .collect(),
1058 common_prefixes: page.common_prefixes,
1059 next_continuation_token: page.next_continuation_token,
1060 has_more: page.has_more,
1061 }
1062}
1063
1064fn presign_method_to_proto(method: PresignMethod) -> pb::PresignMethod {
1065 match method {
1066 PresignMethod::Unspecified => pb::PresignMethod::Unspecified,
1067 PresignMethod::Get => pb::PresignMethod::Get,
1068 PresignMethod::Put => pb::PresignMethod::Put,
1069 PresignMethod::Delete => pb::PresignMethod::Delete,
1070 PresignMethod::Head => pb::PresignMethod::Head,
1071 }
1072}
1073
1074fn presign_method_from_proto(method: i32) -> PresignMethod {
1075 match pb::PresignMethod::try_from(method).unwrap_or(pb::PresignMethod::Unspecified) {
1076 pb::PresignMethod::Get => PresignMethod::Get,
1077 pb::PresignMethod::Put => PresignMethod::Put,
1078 pb::PresignMethod::Delete => PresignMethod::Delete,
1079 pb::PresignMethod::Head => PresignMethod::Head,
1080 pb::PresignMethod::Unspecified => PresignMethod::Unspecified,
1081 }
1082}
1083
1084fn presign_result_from_proto(
1085 result: pb::PresignObjectResponse,
1086 requested_method: PresignMethod,
1087) -> PresignResult {
1088 let method = presign_method_from_proto(result.method);
1089 PresignResult {
1090 url: result.url,
1091 method: if method == PresignMethod::Unspecified {
1092 requested_method
1093 } else {
1094 method
1095 },
1096 expires_at: result.expires_at,
1097 headers: result.headers,
1098 }
1099}
1100
1101fn object_access_url_from_proto(
1102 result: pb::CreateObjectAccessUrlResponse,
1103 requested_method: PresignMethod,
1104) -> ObjectAccessURL {
1105 let method = presign_method_from_proto(result.method);
1106 ObjectAccessURL {
1107 url: result.url,
1108 method: if method == PresignMethod::Unspecified {
1109 requested_method
1110 } else {
1111 method
1112 },
1113 expires_at: result.expires_at,
1114 headers: result.headers,
1115 }
1116}