minio 0.4.0

MinIO SDK for Amazon S3 compatible object storage access
Documentation
// MinIO Rust Library for Amazon S3 Compatible Cloud Storage
// Copyright 2025 MinIO, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::s3::builders::{
    ContentStream, MAX_MULTIPART_COUNT, ObjectContent, Size, calc_part_info,
};
use crate::s3::client::MinioClient;
use crate::s3::error::ValidationErr;
use crate::s3::error::{Error, IoError};
use crate::s3::header_constants::*;
use crate::s3::multimap_ext::{Multimap, MultimapExt};
use crate::s3::response::{AppendObjectResponse, StatObjectResponse};
use crate::s3::response_traits::HasObjectSize;
use crate::s3::segmented_bytes::SegmentedBytes;
use crate::s3::sse::Sse;
use crate::s3::types::{BucketName, ObjectKey, Region, S3Api, S3Request, ToS3Request};
use crate::s3::utils::{ChecksumAlgorithm, check_sse, compute_checksum_sb};
use http::Method;
use std::sync::Arc;
use typed_builder::TypedBuilder;
// region: append-object

/// Argument builder for the [`AppendObject`](https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-buckets-objects-append.html) S3 API operation.
///
/// This struct constructs the parameters required for the [`Client::append_object`](crate::s3::client::MinioClient::append_object) method.
#[derive(Clone, Debug, TypedBuilder)]
pub struct AppendObject {
    #[builder(!default)] // force required
    client: MinioClient,

    #[builder(default, setter(into))]
    extra_headers: Option<Multimap>,

    #[builder(default, setter(into))]
    extra_query_params: Option<Multimap>,

    #[builder(setter(into), !default)]
    bucket: BucketName,

    #[builder(setter(into), !default)]
    object: ObjectKey,

    #[builder(default, setter(into))]
    region: Option<Region>,

    #[builder(default, setter(into))]
    sse: Option<Arc<dyn Sse>>,

    #[builder(!default)] // force required
    data: Arc<SegmentedBytes>,

    /// Value of `x-amz-write-offset-bytes`.
    #[builder(!default)] // force required
    offset_bytes: u64,

    /// Optional checksum algorithm for data integrity verification during append.
    ///
    /// When specified, computes a checksum of the appended data using the selected algorithm
    /// (CRC32, CRC32C, SHA1, SHA256, or CRC64NVME). The checksum is sent with the append
    /// operation and verified by the server.
    #[builder(default, setter(into))]
    checksum_algorithm: Option<ChecksumAlgorithm>,
}

impl S3Api for AppendObject {
    type S3Response = AppendObjectResponse;
}

/// Builder type for [`AppendObject`] that is returned by [`MinioClient::append_object`](crate::s3::client::MinioClient::append_object).
///
/// This type alias simplifies the complex generic signature generated by the `typed_builder` crate.
pub type AppendObjectBldr = AppendObjectBuilder<(
    (MinioClient,),
    (),
    (),
    (BucketName,),
    (ObjectKey,),
    (),
    (),
    (Arc<SegmentedBytes>,),
    (u64,),
    (),
)>;

impl ToS3Request for AppendObject {
    fn to_s3request(self) -> Result<S3Request, ValidationErr> {
        check_sse(&self.sse, &self.client)?;

        let mut headers: Multimap = self.extra_headers.unwrap_or_default();
        headers.add(X_AMZ_WRITE_OFFSET_BYTES, self.offset_bytes.to_string());

        if let Some(algorithm) = self.checksum_algorithm {
            let checksum_value = compute_checksum_sb(algorithm, &self.data);
            headers.add(X_AMZ_CHECKSUM_ALGORITHM, algorithm.as_str().to_string());

            match algorithm {
                ChecksumAlgorithm::CRC32 => headers.add(X_AMZ_CHECKSUM_CRC32, checksum_value),
                ChecksumAlgorithm::CRC32C => headers.add(X_AMZ_CHECKSUM_CRC32C, checksum_value),
                ChecksumAlgorithm::SHA1 => headers.add(X_AMZ_CHECKSUM_SHA1, checksum_value),
                ChecksumAlgorithm::SHA256 => headers.add(X_AMZ_CHECKSUM_SHA256, checksum_value),
                ChecksumAlgorithm::CRC64NVME => {
                    headers.add(X_AMZ_CHECKSUM_CRC64NVME, checksum_value)
                }
            }
        }

        Ok(S3Request::builder()
            .client(self.client)
            .method(Method::PUT)
            .region(self.region)
            .bucket(self.bucket)
            .query_params(self.extra_query_params.unwrap_or_default())
            .object(self.object)
            .headers(headers)
            .body(self.data)
            .build())
    }
}
// endregion: append-object

// region: append-object-content

/// Argument builder for the [`AppendObject`](https://docs.aws.amazon.com/AmazonS3/latest/userguide/directory-buckets-objects-append.html) S3 API operation.
///
/// This struct constructs the parameters required for the [`Client::append_object_content`](crate::s3::client::MinioClient::append_object_content) method.
/// It is High-level API for appending content to an object using multipart uploads.
///
/// `AppendObjectContent` consumes an [`ObjectContent`] stream and transparently appends it to an existing object in MinIO or S3,
/// managing multipart upload details internally.
#[derive(TypedBuilder)]
pub struct AppendObjectContent {
    #[builder(!default)] // force required
    client: MinioClient,
    #[builder(default, setter(into))]
    extra_headers: Option<Multimap>,
    #[builder(default, setter(into))]
    extra_query_params: Option<Multimap>,
    #[builder(default, setter(into))]
    region: Option<Region>,
    #[builder(setter(into), !default)]
    bucket: BucketName,
    #[builder(setter(into), !default)]
    object: ObjectKey,
    #[builder(default)]
    sse: Option<Arc<dyn Sse>>,
    #[builder(default = Size::Unknown)]
    part_size: Size,
    #[builder(!default, setter(into))]
    input_content: ObjectContent,
    #[builder(default = ContentStream::empty())]
    content_stream: ContentStream,
    #[builder(default)]
    part_count: Option<u16>,
    /// Optional checksum algorithm for data integrity verification during append.
    ///
    /// When specified, computes checksums for appended data using the selected algorithm
    /// (CRC32, CRC32C, SHA1, SHA256, or CRC64NVME). The checksum is computed for each
    /// chunk and sent with the append operation.
    #[builder(default, setter(into))]
    checksum_algorithm: Option<ChecksumAlgorithm>,
}

/// Builder type for [`AppendObjectContent`] that is returned by [`MinioClient::append_object_content`](crate::s3::client::MinioClient::append_object_content).
///
/// This type alias simplifies the complex generic signature generated by the `typed_builder` crate.
pub type AppendObjectContentBldr = AppendObjectContentBuilder<(
    (MinioClient,),
    (),
    (),
    (),
    (BucketName,),
    (ObjectKey,),
    (),
    (),
    (ObjectContent,),
    (),
    (),
    (),
)>;

impl AppendObjectContent {
    pub async fn send(mut self) -> Result<AppendObjectResponse, Error> {
        check_sse(&self.sse, &self.client)?;

        self.content_stream = std::mem::take(&mut self.input_content)
            .to_content_stream()
            .await
            .map_err(IoError::from)?;

        // object_size may be Size::Unknown.
        let object_size = self.content_stream.get_size();

        let (part_size, n_expected_parts) = calc_part_info(object_size, self.part_size)?;
        // Set the chosen part size and part count.
        self.part_size = Size::Known(part_size);
        self.part_count = n_expected_parts;

        // Read the first part.
        let seg_bytes = self
            .content_stream
            .read_upto(part_size as usize)
            .await
            .map_err(IoError::from)?;

        // get the length (if any) of the current file
        let resp: StatObjectResponse = self
            .client
            .stat_object(&self.bucket, &self.object)?
            .build()
            .send()
            .await?;
        //println!("statObjectResponse={:#?}", resp);

        let current_file_size = resp.size()?;

        // In the first part read, if:
        //
        //   - object_size is unknown AND we got less than the part size, OR
        //   - we are expecting only one part to be uploaded,
        //
        // we upload it as a simple put object.
        if (object_size.is_unknown() && (seg_bytes.len() as u64) < part_size)
            || n_expected_parts == Some(1)
        {
            let ao = AppendObject {
                client: self.client,
                extra_headers: self.extra_headers,
                extra_query_params: self.extra_query_params,
                bucket: self.bucket,
                object: self.object,
                region: self.region,
                offset_bytes: current_file_size,
                sse: self.sse,
                data: Arc::new(seg_bytes),
                checksum_algorithm: self.checksum_algorithm,
            };
            ao.send().await
        } else if let Some(expected) = object_size.value()
            && (seg_bytes.len() as u64) < part_size
        {
            // Not enough data!
            let got = seg_bytes.len() as u64;
            Err(ValidationErr::InsufficientData { expected, got })?
        } else {
            // Otherwise, we start a multipart append.
            self.send_mpa(part_size, current_file_size, seg_bytes).await
        }
    }

    /// Performs multipart append.
    async fn send_mpa(
        &mut self,
        part_size: u64,
        object_size: u64,
        first_part: SegmentedBytes,
    ) -> Result<AppendObjectResponse, Error> {
        let mut done = false;
        let mut part_number = 0;

        let mut last_resp: Option<AppendObjectResponse> = None;
        let mut next_offset_bytes: u64 = object_size;
        //println!("initial offset_bytes: {}", next_offset_bytes);

        let mut first_part = Some(first_part);
        while !done {
            let part_content: SegmentedBytes = {
                if let Some(v) = first_part.take() {
                    v
                } else {
                    self.content_stream
                        .read_upto(part_size as usize)
                        .await
                        .map_err(IoError::from)?
                }
            };
            part_number += 1;
            let buffer_size = part_content.len() as u64;

            assert!(buffer_size <= part_size, "{buffer_size} <= {part_size}",);

            if buffer_size == 0 && part_number > 1 {
                // We are done as we appended at least 1 part and we have
                // reached the end of the stream.
                break;
            }

            // Check if we have too many parts to upload.
            if self.part_count.is_none() && part_number > MAX_MULTIPART_COUNT {
                return Err(ValidationErr::TooManyParts(part_number as u64).into());
            }

            // Append the part now.
            let append_object = AppendObject::builder()
                .client(self.client.clone())
                .extra_headers(self.extra_headers.clone())
                .extra_query_params(self.extra_query_params.clone())
                .bucket(&self.bucket)
                .object(&self.object)
                .region(self.region.clone())
                .sse(self.sse.clone())
                .data(Arc::new(part_content))
                .offset_bytes(next_offset_bytes)
                .checksum_algorithm(self.checksum_algorithm)
                .build();
            let resp: AppendObjectResponse = append_object.send().await?;
            //println!("AppendObjectResponse: object_size={:?}", resp.object_size);

            next_offset_bytes = resp.object_size();
            last_resp = Some(resp);

            // Finally check if we are done.
            if buffer_size < part_size {
                done = true;
            }
        }
        Ok(last_resp.expect("send_mpa always uploads at least one part"))
    }
}
// endregion: append-object-content