object_store_opendal 0.55.0

object_store Integration for Apache OpenDAL
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use std::sync::Arc;

use bytes::Bytes;
use futures::TryStreamExt;
use futures::stream::BoxStream;
use object_store::ObjectStore;
use object_store::path::Path as ObjectStorePath;

use opendal::raw::*;
use opendal::*;

use super::core::parse_op_read;
use super::error::parse_error;

/// ObjectStore reader
pub struct ObjectStoreReader {
    bytes_stream: BoxStream<'static, object_store::Result<Bytes>>,
    meta: object_store::ObjectMeta,
    args: OpRead,
}

impl ObjectStoreReader {
    pub(crate) async fn new(
        store: Arc<dyn ObjectStore + 'static>,
        path: &str,
        args: OpRead,
    ) -> Result<Self> {
        let path = ObjectStorePath::from(path);
        let opts = parse_op_read(&args)?;
        let result = store.get_opts(&path, opts).await.map_err(parse_error)?;
        let meta = result.meta.clone();
        let bytes_stream = result.into_stream();
        Ok(Self {
            bytes_stream,
            meta,
            args,
        })
    }

    pub(crate) fn rp(&self) -> RpRead {
        let mut rp = RpRead::new().with_size(Some(self.meta.size));
        if !self.args.range().is_full() {
            let range = self.args.range();
            let size = match range.size() {
                Some(size) => size,
                None => self.meta.size,
            };
            rp = rp.with_range(Some(
                BytesContentRange::default().with_range(range.offset(), range.offset() + size - 1),
            ));
        }
        rp
    }
}

// ObjectStoreReader is safe to share between threads, because the `read()` method requires
// `&mut self` and `rp()` is stateless.
unsafe impl Sync for ObjectStoreReader {}

impl oio::Read for ObjectStoreReader {
    async fn read(&mut self) -> Result<Buffer> {
        let bs = self.bytes_stream.try_next().await.map_err(parse_error)?;
        Ok(bs.map(Buffer::from).unwrap_or_default())
    }
}