object_store_opendal 0.55.0

object_store Integration for Apache OpenDAL
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use object_store::MultipartUpload;
use object_store::ObjectStore;
use object_store::PutPayload;
use object_store::path::Path as ObjectStorePath;
use object_store::{Attribute, AttributeValue};

use opendal::raw::oio::MultipartPart;
use opendal::raw::*;
use opendal::*;
use tokio::sync::Mutex;

use super::core::{format_put_multipart_options, format_put_result, parse_op_write};
use super::error::parse_error;

pub struct ObjectStoreWriter {
    store: Arc<dyn ObjectStore + 'static>,
    path: ObjectStorePath,
    args: OpWrite,
    upload: Mutex<Option<Box<dyn MultipartUpload>>>,
}

impl ObjectStoreWriter {
    pub fn new(store: Arc<dyn ObjectStore + 'static>, path: &str, args: OpWrite) -> Self {
        Self {
            store,
            path: ObjectStorePath::from(path),
            args,
            upload: Mutex::new(None),
        }
    }
}

impl oio::MultipartWrite for ObjectStoreWriter {
    /// Write the entire object in one go.
    /// Used when the object is small enough to bypass multipart upload.
    async fn write_once(&self, size: u64, body: Buffer) -> Result<Metadata> {
        // Validate that actual body size matches expected size
        let actual_size = body.len() as u64;
        if actual_size != size {
            return Err(Error::new(
                ErrorKind::Unexpected,
                format!("Expected size {size} but got {actual_size}"),
            ));
        }

        let bytes = body.to_bytes();
        let payload = PutPayload::from(bytes);
        let mut opts = parse_op_write(&self.args)?;

        // Add size metadata for tracking
        opts.attributes.insert(
            Attribute::Metadata("content-size".into()),
            AttributeValue::from(size.to_string()),
        );

        let result = self
            .store
            .put_opts(&self.path, payload, opts)
            .await
            .map_err(parse_error)?;

        // Build metadata from put result
        let mut metadata = Metadata::new(EntryMode::FILE);
        if let Some(etag) = &result.e_tag {
            metadata.set_etag(etag);
        }
        if let Some(version) = &result.version {
            metadata.set_version(version);
        }

        Ok(metadata)
    }

    // Generate a unique upload ID that we'll use to track this session
    async fn initiate_part(&self) -> Result<String> {
        // Start a new multipart upload using object_store
        let opts = parse_op_write(&self.args)?;
        let multipart_opts = format_put_multipart_options(opts);
        let upload = self
            .store
            .put_multipart_opts(&self.path, multipart_opts)
            .await
            .map_err(parse_error)?;

        // Store the multipart upload for later use
        let mut guard = self.upload.lock().await;
        if guard.is_some() {
            return Err(Error::new(
                ErrorKind::Unexpected,
                "Upload already initiated, abort the previous upload first",
            ));
        }
        *guard = Some(upload);

        // object_store does not provide a way to get the upload id, so we use a fixed string
        // as the upload id. it's ok because the upload id is already tracked inside the upload
        // object.
        Ok("".to_string())
    }

    /// Upload a single part of the multipart upload.
    /// Part numbers must be sequential starting from 1.
    /// Returns the ETag and part information for this uploaded part.
    async fn write_part(
        &self,
        _upload_id: &str,
        part_number: usize,
        size: u64,
        body: Buffer,
    ) -> Result<MultipartPart> {
        // Validate that actual body size matches expected size
        let actual_size = body.len() as u64;
        if actual_size != size {
            return Err(Error::new(
                ErrorKind::Unexpected,
                format!("Expected size {size} but got {actual_size}"),
            ));
        }

        // Convert Buffer to PutPayload
        let bytes = body.to_bytes();

        // Return empty string as ETag since it's not used by object_store
        let etag = String::new();

        let payload = PutPayload::from(bytes);

        // Upload the part
        let mut guard = self.upload.lock().await;
        let upload = guard
            .as_mut()
            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;
        upload.put_part(payload).await.map_err(parse_error)?;

        // Create MultipartPart with the proper ETag
        let multipart_part = MultipartPart {
            part_number,
            etag,
            checksum: None, // No checksum for now
        };
        Ok(multipart_part)
    }

    async fn complete_part(
        &self,
        _upload_id: &str,
        parts: &[oio::MultipartPart],
    ) -> Result<Metadata> {
        // Validate that we have parts to complete
        if parts.is_empty() {
            return Err(Error::new(
                ErrorKind::Unexpected,
                "Cannot complete multipart upload with no parts",
            ));
        }

        // Get the multipart upload for this upload_id
        let mut guard = self.upload.lock().await;
        let upload = guard
            .as_mut()
            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;

        // Complete the multipart upload
        let result = upload.complete().await.map_err(parse_error)?;
        *guard = None;

        // Build metadata from the result
        let metadata = format_put_result(result);
        Ok(metadata)
    }

    async fn abort_part(&self, _upload_id: &str) -> Result<()> {
        // Get the multipart upload for this upload_id
        let mut guard = self.upload.lock().await;
        let upload = guard
            .as_mut()
            .ok_or_else(|| Error::new(ErrorKind::Unexpected, "Upload not initiated"))?;

        // Abort the multipart upload
        upload.abort().await.map_err(parse_error)?;
        *guard = None;

        Ok(())
    }
}