use crate::codec::host_service::{HostServiceChannel, connect_host_service, plain_channel};
use crate::codec::s3::{
from_wire_copy_object_response, from_wire_create_object_access_url_response,
from_wire_head_object_response, from_wire_list_objects_response,
from_wire_presign_object_response, from_wire_read_object_chunk,
from_wire_write_object_response, to_wire_copy_object_request,
to_wire_create_object_access_url_request, to_wire_delete_object_request,
to_wire_head_object_request, to_wire_list_objects_request, to_wire_presign_object_request,
to_wire_read_object_request, to_wire_write_object_request,
};
use crate::generated::v1;
use crate::rpc_support::{GestaltError, gestalt_error_code};
use tokio_stream::StreamExt;
pub type PresignMethod = i32;
pub mod presign_method {
pub const PRESIGN_METHOD_UNSPECIFIED: i32 = 0;
pub const PRESIGN_METHOD_GET: i32 = 1;
pub const PRESIGN_METHOD_PUT: i32 = 2;
pub const PRESIGN_METHOD_DELETE: i32 = 3;
pub const PRESIGN_METHOD_HEAD: i32 = 4;
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ByteRange {
pub start: Option<i64>,
pub end: Option<i64>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CopyObjectRequest {
pub source: Option<S3ObjectRef>,
pub destination: Option<S3ObjectRef>,
pub if_match: String,
pub if_none_match: String,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CopyObjectResponse {
pub meta: Option<S3ObjectMeta>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CreateObjectAccessURLRequest {
pub r#ref: Option<S3ObjectRef>,
pub method: PresignMethod,
pub expires_seconds: i64,
pub content_type: String,
pub content_disposition: String,
pub headers: std::collections::BTreeMap<String, String>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct CreateObjectAccessURLResponse {
pub url: String,
pub method: PresignMethod,
pub expires_at: Option<std::time::SystemTime>,
pub headers: std::collections::BTreeMap<String, String>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct DeleteObjectRequest {
pub r#ref: Option<S3ObjectRef>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct HeadObjectRequest {
pub r#ref: Option<S3ObjectRef>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct HeadObjectResponse {
pub meta: Option<S3ObjectMeta>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ListObjectsRequest {
pub prefix: String,
pub delimiter: String,
pub continuation_token: String,
pub start_after: String,
pub max_keys: i32,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ListObjectsResponse {
pub objects: Vec<S3ObjectMeta>,
pub common_prefixes: Vec<String>,
pub next_continuation_token: String,
pub has_more: bool,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct PresignObjectRequest {
pub r#ref: Option<S3ObjectRef>,
pub method: PresignMethod,
pub expires_seconds: i64,
pub content_type: String,
pub content_disposition: String,
pub headers: std::collections::BTreeMap<String, String>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct PresignObjectResponse {
pub url: String,
pub method: PresignMethod,
pub expires_at: Option<std::time::SystemTime>,
pub headers: std::collections::BTreeMap<String, String>,
}
#[allow(clippy::enum_variant_names, clippy::large_enum_variant)]
#[derive(Clone, Debug, PartialEq)]
pub enum ReadObjectChunkResult {
Meta(S3ObjectMeta),
Data(Vec<u8>),
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ReadObjectChunk {
pub result: Option<ReadObjectChunkResult>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct ReadObjectRequest {
pub r#ref: Option<S3ObjectRef>,
pub range: Option<ByteRange>,
pub if_match: String,
pub if_none_match: String,
pub if_modified_since: Option<std::time::SystemTime>,
pub if_unmodified_since: Option<std::time::SystemTime>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct S3ObjectMeta {
pub r#ref: Option<S3ObjectRef>,
pub etag: String,
pub size: i64,
pub content_type: String,
pub last_modified: Option<std::time::SystemTime>,
pub metadata: std::collections::BTreeMap<String, String>,
pub storage_class: String,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct S3ObjectRef {
pub key: String,
pub version_id: String,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct WriteObjectOpen {
pub r#ref: Option<S3ObjectRef>,
pub content_type: String,
pub cache_control: String,
pub content_disposition: String,
pub content_encoding: String,
pub content_language: String,
pub metadata: std::collections::BTreeMap<String, String>,
pub if_match: String,
pub if_none_match: String,
}
#[allow(clippy::enum_variant_names, clippy::large_enum_variant)]
#[derive(Clone, Debug, PartialEq)]
pub enum WriteObjectRequestMsg {
Open(WriteObjectOpen),
Data(Vec<u8>),
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct WriteObjectRequest {
pub msg: Option<WriteObjectRequestMsg>,
}
#[derive(Clone, Debug, Default, PartialEq)]
pub struct WriteObjectResponse {
pub meta: Option<S3ObjectMeta>,
}
pub struct S3 {
inner: v1::s3_client::S3Client<HostServiceChannel>,
timeout: Option<std::time::Duration>,
}
impl S3 {
pub fn new(channel: tonic::transport::Channel) -> Self {
Self {
inner: v1::s3_client::S3Client::new(plain_channel(channel)),
timeout: None,
}
}
pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub async fn connect() -> Result<Self, GestaltError> {
Self::connect_named("").await
}
pub async fn connect_named(name: &str) -> Result<Self, GestaltError> {
Ok(Self {
inner: v1::s3_client::S3Client::new(connect_host_service("s3", name).await?),
timeout: None,
})
}
pub async fn head_object(
&mut self,
r#ref: Option<S3ObjectRef>,
) -> Result<HeadObjectResponse, GestaltError> {
let request = HeadObjectRequest { r#ref };
let mut tonic_request = tonic::Request::new(to_wire_head_object_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
let response = self.inner.head_object(tonic_request).await?;
Ok(from_wire_head_object_response(response.into_inner()))
}
pub async fn head_object_raw(
&mut self,
request: HeadObjectRequest,
) -> Result<HeadObjectResponse, GestaltError> {
let mut tonic_request = tonic::Request::new(to_wire_head_object_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
let response = self.inner.head_object(tonic_request).await?;
Ok(from_wire_head_object_response(response.into_inner()))
}
pub async fn read_object(
&mut self,
request: ReadObjectRequest,
) -> Result<(S3ObjectMeta, S3ReadObjectData), GestaltError> {
let mut frames = self
.inner
.read_object(to_wire_read_object_request(request))
.await?
.into_inner();
let frame = frames.message().await?.map(from_wire_read_object_chunk);
let meta = match frame.and_then(|frame| frame.result) {
Some(ReadObjectChunkResult::Meta(value)) => value,
_ => {
return Err(GestaltError::new(
gestalt_error_code::INTERNAL,
"stream did not begin with the expected header frame",
));
}
};
Ok((meta, S3ReadObjectData { inner: frames }))
}
pub async fn read_object_raw(
&mut self,
request: ReadObjectRequest,
) -> Result<S3ReadObjectStream, GestaltError> {
let response = self
.inner
.read_object(to_wire_read_object_request(request))
.await?;
Ok(S3ReadObjectStream {
inner: response.into_inner(),
})
}
pub async fn write_object(
&mut self,
open: WriteObjectOpen,
data: impl tokio_stream::Stream<Item = Vec<u8>> + Send + 'static,
) -> Result<WriteObjectResponse, GestaltError> {
let requests = tokio_stream::once(WriteObjectRequest {
msg: Some(WriteObjectRequestMsg::Open(open)),
})
.chain(data.map(|chunk| WriteObjectRequest {
msg: Some(WriteObjectRequestMsg::Data(chunk)),
}))
.map(to_wire_write_object_request);
let response = self.inner.write_object(requests).await?;
Ok(from_wire_write_object_response(response.into_inner()))
}
pub async fn write_object_raw(
&mut self,
requests: impl tokio_stream::Stream<Item = WriteObjectRequest> + Send + 'static,
) -> Result<WriteObjectResponse, GestaltError> {
let response = self
.inner
.write_object(requests.map(to_wire_write_object_request))
.await?;
Ok(from_wire_write_object_response(response.into_inner()))
}
pub async fn delete_object(&mut self, r#ref: Option<S3ObjectRef>) -> Result<(), GestaltError> {
let request = DeleteObjectRequest { r#ref };
let mut tonic_request = tonic::Request::new(to_wire_delete_object_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
self.inner.delete_object(tonic_request).await?;
Ok(())
}
pub async fn delete_object_raw(
&mut self,
request: DeleteObjectRequest,
) -> Result<(), GestaltError> {
let mut tonic_request = tonic::Request::new(to_wire_delete_object_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
self.inner.delete_object(tonic_request).await?;
Ok(())
}
pub async fn list_objects(
&mut self,
prefix: String,
delimiter: String,
continuation_token: String,
start_after: String,
max_keys: i32,
) -> Result<ListObjectsResponse, GestaltError> {
let request = ListObjectsRequest {
prefix,
delimiter,
continuation_token,
start_after,
max_keys,
};
let mut tonic_request = tonic::Request::new(to_wire_list_objects_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
let response = self.inner.list_objects(tonic_request).await?;
Ok(from_wire_list_objects_response(response.into_inner()))
}
pub async fn list_objects_raw(
&mut self,
request: ListObjectsRequest,
) -> Result<ListObjectsResponse, GestaltError> {
let mut tonic_request = tonic::Request::new(to_wire_list_objects_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
let response = self.inner.list_objects(tonic_request).await?;
Ok(from_wire_list_objects_response(response.into_inner()))
}
pub async fn copy_object(
&mut self,
if_match: String,
if_none_match: String,
source: Option<S3ObjectRef>,
destination: Option<S3ObjectRef>,
) -> Result<CopyObjectResponse, GestaltError> {
let request = CopyObjectRequest {
if_match,
if_none_match,
source,
destination,
};
let mut tonic_request = tonic::Request::new(to_wire_copy_object_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
let response = self.inner.copy_object(tonic_request).await?;
Ok(from_wire_copy_object_response(response.into_inner()))
}
pub async fn copy_object_raw(
&mut self,
request: CopyObjectRequest,
) -> Result<CopyObjectResponse, GestaltError> {
let mut tonic_request = tonic::Request::new(to_wire_copy_object_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
let response = self.inner.copy_object(tonic_request).await?;
Ok(from_wire_copy_object_response(response.into_inner()))
}
pub async fn presign_object(
&mut self,
method: PresignMethod,
expires_seconds: i64,
r#ref: Option<S3ObjectRef>,
options: S3PresignObjectOptions,
) -> Result<PresignObjectResponse, GestaltError> {
let request = PresignObjectRequest {
method,
expires_seconds,
r#ref,
content_type: options.content_type,
content_disposition: options.content_disposition,
..Default::default()
};
let mut tonic_request = tonic::Request::new(to_wire_presign_object_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
let response = self.inner.presign_object(tonic_request).await?;
Ok(from_wire_presign_object_response(response.into_inner()))
}
pub async fn presign_object_raw(
&mut self,
request: PresignObjectRequest,
) -> Result<PresignObjectResponse, GestaltError> {
let mut tonic_request = tonic::Request::new(to_wire_presign_object_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
let response = self.inner.presign_object(tonic_request).await?;
Ok(from_wire_presign_object_response(response.into_inner()))
}
}
#[derive(Clone, Debug, Default)]
pub struct S3PresignObjectOptions {
pub content_type: String,
pub content_disposition: String,
}
pub struct S3ReadObjectData {
inner: tonic::Streaming<v1::ReadObjectChunk>,
}
impl S3ReadObjectData {
pub async fn recv(&mut self) -> Result<Option<Vec<u8>>, GestaltError> {
match self.inner.message().await?.map(from_wire_read_object_chunk) {
None => Ok(None),
Some(frame) => match frame.result {
Some(ReadObjectChunkResult::Data(value)) => Ok(Some(value)),
_ => Err(GestaltError::new(
gestalt_error_code::INTERNAL,
"unexpected frame in payload stream",
)),
},
}
}
pub async fn collect(&mut self) -> Result<Vec<u8>, GestaltError> {
let mut out = Vec::new();
while let Some(chunk) = self.recv().await? {
out.extend_from_slice(&chunk);
}
Ok(out)
}
}
pub struct S3ReadObjectStream {
inner: tonic::Streaming<v1::ReadObjectChunk>,
}
impl S3ReadObjectStream {
pub async fn recv(&mut self) -> Result<Option<ReadObjectChunk>, GestaltError> {
Ok(self.inner.message().await?.map(from_wire_read_object_chunk))
}
}
pub struct S3ObjectAccess {
inner: v1::s3_object_access_client::S3ObjectAccessClient<tonic::transport::Channel>,
timeout: Option<std::time::Duration>,
}
impl S3ObjectAccess {
pub fn new(channel: tonic::transport::Channel) -> Self {
Self {
inner: v1::s3_object_access_client::S3ObjectAccessClient::new(channel),
timeout: None,
}
}
pub fn with_timeout(mut self, timeout: std::time::Duration) -> Self {
self.timeout = Some(timeout);
self
}
pub async fn create_object_access_url(
&mut self,
method: PresignMethod,
expires_seconds: i64,
r#ref: Option<S3ObjectRef>,
options: S3ObjectAccessCreateObjectAccessURLOptions,
) -> Result<CreateObjectAccessURLResponse, GestaltError> {
let request = CreateObjectAccessURLRequest {
method,
expires_seconds,
r#ref,
content_type: options.content_type,
content_disposition: options.content_disposition,
..Default::default()
};
let mut tonic_request =
tonic::Request::new(to_wire_create_object_access_url_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
let response = self.inner.create_object_access_url(tonic_request).await?;
Ok(from_wire_create_object_access_url_response(
response.into_inner(),
))
}
pub async fn create_object_access_url_raw(
&mut self,
request: CreateObjectAccessURLRequest,
) -> Result<CreateObjectAccessURLResponse, GestaltError> {
let mut tonic_request =
tonic::Request::new(to_wire_create_object_access_url_request(request));
if let Some(timeout) = self.timeout {
tonic_request.set_timeout(timeout);
}
let response = self.inner.create_object_access_url(tonic_request).await?;
Ok(from_wire_create_object_access_url_response(
response.into_inner(),
))
}
}
#[derive(Clone, Debug, Default)]
pub struct S3ObjectAccessCreateObjectAccessURLOptions {
pub content_type: String,
pub content_disposition: String,
}