object_store_opendal 0.46.1

object_store Integration for Apache OpenDAL
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use futures::Stream;
use object_store::ObjectMeta;
use opendal::{Entry, Metadata, Metakey};
use std::future::IntoFuture;

/// Conditionally add the `Send` marker trait for the wrapped type.
/// Only take effect when the `send_wrapper` feature is enabled.
#[cfg(not(feature = "send_wrapper"))]
use noop_wrapper::NoopWrapper as SendWrapper;
#[cfg(feature = "send_wrapper")]
use send_wrapper::SendWrapper;

/// Format `opendal::Error` to `object_store::Error`.
pub fn format_object_store_error(err: opendal::Error, path: &str) -> object_store::Error {
    use opendal::ErrorKind;
    match err.kind() {
        ErrorKind::NotFound => object_store::Error::NotFound {
            path: path.to_string(),
            source: Box::new(err),
        },
        ErrorKind::Unsupported => object_store::Error::NotSupported {
            source: Box::new(err),
        },
        ErrorKind::AlreadyExists => object_store::Error::AlreadyExists {
            path: path.to_string(),
            source: Box::new(err),
        },
        kind => object_store::Error::Generic {
            store: kind.into_static(),
            source: Box::new(err),
        },
    }
}

/// Format `opendal::Metadata` to `object_store::ObjectMeta`.
pub fn format_object_meta(path: &str, meta: &Metadata) -> ObjectMeta {
    let version = match meta.metakey().contains(Metakey::Version) {
        true => meta.version().map(|x| x.to_string()),
        false => None,
    };

    let e_tag = match meta.metakey().contains(Metakey::Etag) {
        true => meta.etag().map(|x| x.to_string()),
        false => None,
    };

    ObjectMeta {
        location: path.into(),
        last_modified: meta.last_modified().unwrap_or_default(),
        size: meta.content_length() as usize,
        e_tag,
        version,
    }
}

/// Try to format `opendal::Entry` to `object_store::ObjectMeta`.
pub async fn try_format_object_meta(
    res: object_store::Result<Entry, opendal::Error>,
) -> object_store::Result<ObjectMeta> {
    let entry = res.map_err(|err| format_object_store_error(err, ""))?;
    let meta = entry.metadata();

    Ok(format_object_meta(entry.path(), meta))
}

/// Make given future `Send`.
pub trait IntoSendFuture {
    type Output;

    fn into_send(self) -> Self::Output;
}

impl<T> IntoSendFuture for T
where
    T: IntoFuture,
{
    type Output = SendWrapper<T::IntoFuture>;

    fn into_send(self) -> Self::Output {
        SendWrapper::new(self.into_future())
    }
}

/// Make given Stream `Send`.
pub trait IntoSendStream {
    type Output;

    fn into_send(self) -> Self::Output;
}

impl<T> IntoSendStream for T
where
    T: Stream,
{
    type Output = SendWrapper<T>;
    fn into_send(self) -> Self::Output {
        SendWrapper::new(self)
    }
}

#[cfg(not(feature = "send_wrapper"))]
mod noop_wrapper {
    use std::pin::Pin;
    use std::task::Context;
    use std::task::Poll;

    use futures::Future;
    use futures::Stream;
    use pin_project::pin_project;

    #[pin_project]
    pub struct NoopWrapper<T> {
        #[pin]
        item: T,
    }

    impl<T> Future for NoopWrapper<T>
    where
        T: Future,
    {
        type Output = T::Output;

        fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
            let this = self.project();
            this.item.poll(cx)
        }
    }

    impl<T> Stream for NoopWrapper<T>
    where
        T: Stream,
    {
        type Item = T::Item;

        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
            let this = self.project();
            this.item.poll_next(cx)
        }
    }

    impl<T> NoopWrapper<T> {
        pub fn new(item: T) -> Self {
            Self { item }
        }
    }
}