use crate::s3::client::MinioClient;
use crate::s3::error::{Error, ValidationErr};
use crate::s3::header_constants::*;
use crate::s3::multimap_ext::{Multimap, MultimapExt};
use crate::s3::response::{DeleteError, DeleteObjectResponse, DeleteObjectsResponse};
use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::types::{
BucketName, ListEntry, ObjectKey, Region, S3Api, S3Request, ToS3Request, ToStream, VersionId,
};
use crate::s3::utils::{check_bucket_name, check_object_name, insert, md5sum_hash};
use async_trait::async_trait;
use bytes::Bytes;
use futures_util::stream::iter;
use futures_util::{Stream, StreamExt, stream as futures_stream};
use http::Method;
use std::pin::Pin;
use std::sync::Arc;
use typed_builder::TypedBuilder;
pub const MAX_DELETE_OBJECTS: usize = 1000;
#[derive(Debug, Clone, Default, TypedBuilder)]
pub struct ObjectToDelete {
key: ObjectKey,
version_id: Option<VersionId>,
}
impl TryFrom<String> for ObjectToDelete {
type Error = ValidationErr;
fn try_from(key: String) -> Result<Self, Self::Error> {
Ok(Self {
key: ObjectKey::new(key)?,
version_id: None,
})
}
}
impl TryFrom<&str> for ObjectToDelete {
type Error = ValidationErr;
fn try_from(key: &str) -> Result<Self, Self::Error> {
Ok(Self {
key: ObjectKey::new(key)?,
version_id: None,
})
}
}
impl From<ObjectKey> for ObjectToDelete {
fn from(key: ObjectKey) -> Self {
Self {
key,
version_id: None,
}
}
}
impl From<&ObjectKey> for ObjectToDelete {
fn from(key: &ObjectKey) -> Self {
Self {
key: key.clone(),
version_id: None,
}
}
}
impl<V: Into<VersionId>> TryFrom<(String, V)> for ObjectToDelete {
type Error = ValidationErr;
fn try_from((key, version_id): (String, V)) -> Result<Self, Self::Error> {
Ok(Self {
key: ObjectKey::new(key)?,
version_id: Some(version_id.into()),
})
}
}
impl<V: Into<VersionId>> TryFrom<(&str, V)> for ObjectToDelete {
type Error = ValidationErr;
fn try_from((key, version_id): (&str, V)) -> Result<Self, Self::Error> {
Ok(Self {
key: ObjectKey::new(key)?,
version_id: Some(version_id.into()),
})
}
}
impl<V: Into<VersionId>> From<(ObjectKey, V)> for ObjectToDelete {
fn from((key, version_id): (ObjectKey, V)) -> Self {
Self {
key,
version_id: Some(version_id.into()),
}
}
}
impl<V: Into<VersionId>> TryFrom<(String, Option<V>)> for ObjectToDelete {
type Error = ValidationErr;
fn try_from((key, version_id): (String, Option<V>)) -> Result<Self, Self::Error> {
Ok(Self {
key: ObjectKey::new(key)?,
version_id: version_id.map(Into::into),
})
}
}
impl<V: Into<VersionId>> TryFrom<(&str, Option<V>)> for ObjectToDelete {
type Error = ValidationErr;
fn try_from((key, version_id): (&str, Option<V>)) -> Result<Self, Self::Error> {
Ok(Self {
key: ObjectKey::new(key)?,
version_id: version_id.map(Into::into),
})
}
}
impl<V: Into<VersionId>> From<(ObjectKey, Option<V>)> for ObjectToDelete {
fn from((key, version_id): (ObjectKey, Option<V>)) -> Self {
Self {
key,
version_id: version_id.map(Into::into),
}
}
}
impl From<ListEntry> for ObjectToDelete {
fn from(entry: ListEntry) -> Self {
Self {
key: ObjectKey::new_unchecked(entry.name),
version_id: entry.version_id.map(VersionId::new_unchecked),
}
}
}
impl From<DeleteError> for ObjectToDelete {
fn from(entry: DeleteError) -> Self {
Self {
key: ObjectKey::new_unchecked(entry.object_name),
version_id: entry.version_id.map(VersionId::new_unchecked),
}
}
}
#[derive(Debug, Clone, TypedBuilder)]
pub struct DeleteObject {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(setter(into))] bucket: BucketName,
#[builder(default, setter(into))]
object: ObjectToDelete,
#[builder(default)]
bypass_governance_mode: bool,
}
impl S3Api for DeleteObject {
type S3Response = DeleteObjectResponse;
}
pub type DeleteObjectBldr = DeleteObjectBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(ObjectToDelete,),
(),
)>;
impl ToS3Request for DeleteObject {
fn to_s3request(self) -> Result<S3Request, ValidationErr> {
check_bucket_name(&self.bucket, true)?;
check_object_name(&self.object.key)?;
let mut query_params: Multimap = self.extra_query_params.unwrap_or_default();
query_params.add_version(self.object.version_id);
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
if self.bypass_governance_mode {
headers.add(X_AMZ_BYPASS_GOVERNANCE_RETENTION, "true");
}
Ok(S3Request::builder()
.client(self.client)
.method(Method::DELETE)
.region(self.region)
.bucket(self.bucket)
.object(self.object.key)
.query_params(query_params)
.headers(headers)
.build())
}
}
#[derive(Clone, Debug, TypedBuilder)]
pub struct DeleteObjects {
#[builder(!default)] client: MinioClient,
#[builder(default, setter(into))]
extra_headers: Option<Multimap>,
#[builder(default, setter(into))]
extra_query_params: Option<Multimap>,
#[builder(default, setter(into))]
region: Option<Region>,
#[builder(setter(into))] bucket: BucketName,
#[builder(!default)]
objects: Vec<ObjectToDelete>,
#[builder(default)]
bypass_governance_mode: bool,
#[builder(default)]
verbose_mode: bool,
}
impl S3Api for DeleteObjects {
type S3Response = DeleteObjectsResponse;
}
pub type DeleteObjectsBldr = DeleteObjectsBuilder<(
(MinioClient,),
(),
(),
(),
(BucketName,),
(Vec<ObjectToDelete>,),
(),
(),
)>;
impl ToS3Request for DeleteObjects {
fn to_s3request(self) -> Result<S3Request, ValidationErr> {
check_bucket_name(&self.bucket, true)?;
if self.objects.len() > MAX_DELETE_OBJECTS {
return Err(ValidationErr::TooManyDeleteObjects(self.objects.len()));
}
let mut data: String = String::from("<Delete>");
if !self.verbose_mode {
data.push_str("<Quiet>true</Quiet>");
}
for object in self.objects.iter() {
data.push_str("<Object>");
data.push_str("<Key>");
data.push_str(object.key.as_str());
data.push_str("</Key>");
if let Some(v) = object.version_id.as_ref() {
data.push_str("<VersionId>");
data.push_str(v.as_str());
data.push_str("</VersionId>");
}
data.push_str("</Object>");
}
data.push_str("</Delete>");
let bytes: Bytes = data.into();
let mut headers: Multimap = self.extra_headers.unwrap_or_default();
{
if self.bypass_governance_mode {
headers.add(X_AMZ_BYPASS_GOVERNANCE_RETENTION, "true");
}
headers.add(CONTENT_TYPE, "application/xml");
headers.add(CONTENT_MD5, md5sum_hash(bytes.as_ref()));
}
let body = Arc::new(SegmentedBytes::from(bytes));
Ok(S3Request::builder()
.client(self.client)
.method(Method::POST)
.region(self.region)
.bucket(self.bucket)
.query_params(insert(self.extra_query_params, "delete"))
.headers(headers)
.body(body)
.build())
}
}
pub struct ObjectsStream {
items: Pin<Box<dyn Stream<Item = ObjectToDelete> + Send + Sync>>,
}
impl ObjectsStream {
pub fn from_stream(s: impl Stream<Item = ObjectToDelete> + Send + Sync + 'static) -> Self {
Self { items: Box::pin(s) }
}
}
impl From<ObjectToDelete> for ObjectsStream {
fn from(delete_object: ObjectToDelete) -> Self {
Self::from_stream(iter(std::iter::once(delete_object)))
}
}
impl<I> From<I> for ObjectsStream
where
I: Iterator<Item = ObjectToDelete> + Send + Sync + 'static,
{
fn from(keys: I) -> Self {
Self::from_stream(iter(keys))
}
}
pub struct DeleteObjectsStreaming {
client: MinioClient,
bucket: BucketName,
objects: ObjectsStream,
bypass_governance_mode: bool,
verbose_mode: bool,
extra_headers: Option<Multimap>,
extra_query_params: Option<Multimap>,
region: Option<Region>,
}
impl DeleteObjectsStreaming {
pub fn new(client: MinioClient, bucket: BucketName, objects: impl Into<ObjectsStream>) -> Self {
Self {
client,
bucket,
objects: objects.into(),
bypass_governance_mode: false,
verbose_mode: false,
extra_headers: None,
extra_query_params: None,
region: None,
}
}
pub fn bypass_governance_mode(mut self, bypass_governance_mode: bool) -> Self {
self.bypass_governance_mode = bypass_governance_mode;
self
}
pub fn verbose_mode(mut self, verbose_mode: bool) -> Self {
self.verbose_mode = verbose_mode;
self
}
pub fn extra_headers(mut self, extra_headers: Option<Multimap>) -> Self {
self.extra_headers = extra_headers;
self
}
pub fn extra_query_params(mut self, extra_query_params: Option<Multimap>) -> Self {
self.extra_query_params = extra_query_params;
self
}
pub fn region(mut self, region: Option<Region>) -> Self {
self.region = region;
self
}
async fn next_request(&mut self) -> Result<Option<DeleteObjects>, ValidationErr> {
let mut objects = Vec::new();
while let Some(object) = self.objects.items.next().await {
objects.push(object);
if objects.len() >= MAX_DELETE_OBJECTS {
break;
}
}
if objects.is_empty() {
return Ok(None);
}
Ok(Some(
DeleteObjects::builder()
.client(self.client.clone())
.bucket(&self.bucket)
.objects(objects)
.bypass_governance_mode(self.bypass_governance_mode)
.verbose_mode(self.verbose_mode)
.extra_headers(self.extra_headers.clone())
.extra_query_params(self.extra_query_params.clone())
.region(self.region.clone())
.build(),
))
}
}
#[async_trait]
impl ToStream for DeleteObjectsStreaming {
type Item = DeleteObjectsResponse;
async fn to_stream(
mut self,
) -> Box<dyn Stream<Item = Result<Self::Item, Error>> + Unpin + Send> {
Box::new(Box::pin(futures_stream::unfold(
self,
move |mut this| async move {
match this.next_request().await {
Ok(Some(request)) => {
let response = request.send().await;
Some((response, this))
}
Ok(None) => None,
Err(e) => Some((Err(e.into()), this)),
}
},
)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::s3::creds::StaticProvider;
use crate::s3::http::BaseUrl;
fn dummy_client() -> MinioClient {
let base_url: BaseUrl = "http://localhost:9000".parse().unwrap();
let provider = StaticProvider::new("minioadmin", "minioadmin", None);
MinioClient::new(base_url, Some(provider), None, None).unwrap()
}
fn dummy_object(i: usize) -> ObjectToDelete {
ObjectToDelete {
key: ObjectKey::new(format!("obj-{i}")).unwrap(),
version_id: None,
}
}
#[tokio::test]
async fn next_request_batches_at_max_delete_objects_boundary() {
let items: Vec<ObjectToDelete> = (0..=MAX_DELETE_OBJECTS).map(dummy_object).collect();
let mut streaming = DeleteObjectsStreaming::new(
dummy_client(),
BucketName::new("test-bucket").unwrap(),
items.into_iter(),
);
let first = streaming.next_request().await.unwrap().unwrap();
assert_eq!(first.objects.len(), MAX_DELETE_OBJECTS);
let second = streaming.next_request().await.unwrap().unwrap();
assert_eq!(second.objects.len(), 1);
assert!(streaming.next_request().await.unwrap().is_none());
}
#[tokio::test]
async fn next_request_fits_full_batch_in_one_request() {
let items: Vec<ObjectToDelete> = (0..MAX_DELETE_OBJECTS).map(dummy_object).collect();
let mut streaming = DeleteObjectsStreaming::new(
dummy_client(),
BucketName::new("test-bucket").unwrap(),
items.into_iter(),
);
let only = streaming.next_request().await.unwrap().unwrap();
assert_eq!(only.objects.len(), MAX_DELETE_OBJECTS);
assert!(streaming.next_request().await.unwrap().is_none());
}
}