lance-io 6.0.0

I/O utilities for Lance
Documentation
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright The Lance Authors

use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;

use futures::{StreamExt, TryStreamExt, stream, stream::BoxStream};
use object_store::path::Path;
use object_store::{
    GetOptions, GetResult, ListResult, MultipartUpload, ObjectMeta, ObjectStore as OSObjectStore,
    PutMultipartOptions, PutOptions, PutPayload, PutResult,
};
use object_store_opendal::OpendalStore;
use tokio::sync::RwLock;

use crate::object_store::StorageOptionsAccessor;
use lance_core::Result;

type NormalizeConfigFn = fn(&HashMap<String, String>) -> Result<HashMap<String, String>>;
type BuildStoreFn = fn(HashMap<String, String>) -> Result<OpendalStore>;

#[derive(Debug, Clone)]
struct CachedOpenDalStore {
    config: HashMap<String, String>,
    store: Arc<OpendalStore>,
}

#[derive(Clone)]
pub(in crate::object_store) struct DynamicOpenDalStore {
    name: Arc<str>,
    base_options: Arc<HashMap<String, String>>,
    accessor: Arc<StorageOptionsAccessor>,
    normalize_config: NormalizeConfigFn,
    build_store: BuildStoreFn,
    protected_keys: Vec<&'static str>,
    cache: Arc<RwLock<Option<CachedOpenDalStore>>>,
}

impl fmt::Debug for DynamicOpenDalStore {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("DynamicOpenDalStore")
            .field("name", &self.name)
            .field("accessor", &self.accessor)
            .finish()
    }
}

impl DynamicOpenDalStore {
    pub(in crate::object_store) fn new(
        name: impl Into<Arc<str>>,
        base_options: HashMap<String, String>,
        accessor: Arc<StorageOptionsAccessor>,
        normalize_config: NormalizeConfigFn,
        build_store: BuildStoreFn,
    ) -> Self {
        Self {
            name: name.into(),
            base_options: Arc::new(base_options),
            accessor,
            normalize_config,
            build_store,
            protected_keys: Vec::new(),
            cache: Arc::new(RwLock::new(None)),
        }
    }

    pub(in crate::object_store) fn with_protected_keys(
        mut self,
        keys: impl IntoIterator<Item = &'static str>,
    ) -> Self {
        self.protected_keys = keys.into_iter().collect();
        self
    }

    fn merge_options(
        &self,
        mut dynamic_options: HashMap<String, String>,
    ) -> HashMap<String, String> {
        for key in &self.protected_keys {
            dynamic_options.remove(*key);
        }
        let mut merged = self.base_options.as_ref().clone();
        merged.extend(dynamic_options);
        merged
    }

    pub(in crate::object_store) async fn current_store(&self) -> Result<Arc<OpendalStore>> {
        let merged_options = self.merge_options(self.accessor.get_storage_options().await?.0);
        let config = (self.normalize_config)(&merged_options)?;

        // Cache reuse depends on exact normalized config equality. Providers
        // should return stable, canonicalized values for semantically identical
        // configurations to avoid unnecessary store rebuilds.
        {
            let cache = self.cache.read().await;
            if let Some(cached) = cache.as_ref()
                && cached.config == config
            {
                return Ok(cached.store.clone());
            }
        }

        let store = Arc::new((self.build_store)(config.clone())?);
        let mut cache = self.cache.write().await;
        if let Some(cached) = cache.as_ref()
            && cached.config == config
        {
            return Ok(cached.store.clone());
        }

        *cache = Some(CachedOpenDalStore {
            config,
            store: store.clone(),
        });
        Ok(store)
    }

    fn map_store_error(&self, error: lance_core::Error) -> object_store::Error {
        object_store::Error::Generic {
            store: "DynamicOpenDalStore",
            source: Box::new(error),
        }
    }
}

impl fmt::Display for DynamicOpenDalStore {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(f, "DynamicOpenDalStore({})", self.name)
    }
}

#[async_trait::async_trait]
impl OSObjectStore for DynamicOpenDalStore {
    async fn put_opts(
        &self,
        location: &Path,
        payload: PutPayload,
        opts: PutOptions,
    ) -> object_store::Result<PutResult> {
        self.current_store()
            .await
            .map_err(|e| self.map_store_error(e))?
            .put_opts(location, payload, opts)
            .await
    }

    async fn put_multipart_opts(
        &self,
        location: &Path,
        opts: PutMultipartOptions,
    ) -> object_store::Result<Box<dyn MultipartUpload>> {
        self.current_store()
            .await
            .map_err(|e| self.map_store_error(e))?
            .put_multipart_opts(location, opts)
            .await
    }

    async fn get_opts(
        &self,
        location: &Path,
        options: GetOptions,
    ) -> object_store::Result<GetResult> {
        self.current_store()
            .await
            .map_err(|e| self.map_store_error(e))?
            .get_opts(location, options)
            .await
    }

    async fn delete(&self, location: &Path) -> object_store::Result<()> {
        self.current_store()
            .await
            .map_err(|e| self.map_store_error(e))?
            .delete(location)
            .await
    }

    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, object_store::Result<ObjectMeta>> {
        let prefix = prefix.cloned();
        let this = self.clone();
        stream::once(async move {
            this.current_store()
                .await
                .map_err(|e| this.map_store_error(e))
        })
        .map_ok(move |store| store.list(prefix.as_ref()))
        .try_flatten()
        .boxed()
    }

    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> object_store::Result<ListResult> {
        self.current_store()
            .await
            .map_err(|e| self.map_store_error(e))?
            .list_with_delimiter(prefix)
            .await
    }

    async fn copy(&self, from: &Path, to: &Path) -> object_store::Result<()> {
        self.current_store()
            .await
            .map_err(|e| self.map_store_error(e))?
            .copy(from, to)
            .await
    }

    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
        self.current_store()
            .await
            .map_err(|e| self.map_store_error(e))?
            .copy_if_not_exists(from, to)
            .await
    }

    async fn rename(&self, from: &Path, to: &Path) -> object_store::Result<()> {
        self.current_store()
            .await
            .map_err(|e| self.map_store_error(e))?
            .rename(from, to)
            .await
    }

    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> object_store::Result<()> {
        self.current_store()
            .await
            .map_err(|e| self.map_store_error(e))?
            .rename_if_not_exists(from, to)
            .await
    }
}

#[cfg(test)]
mod tests {
    use std::collections::HashMap;
    use std::sync::Arc;

    use opendal::{Operator, services::Memory};

    use super::*;
    use crate::object_store::test_utils::StaticMockStorageOptionsProvider;

    #[tokio::test]
    async fn test_dynamic_store_caches_by_normalized_config() {
        let accessor = Arc::new(StorageOptionsAccessor::with_provider(Arc::new(
            StaticMockStorageOptionsProvider {
                options: HashMap::from([("token".to_string(), "value".to_string())]),
            },
        )));

        let store = DynamicOpenDalStore::new(
            "memory",
            HashMap::new(),
            accessor,
            |options| Ok(options.clone()),
            |_| {
                let operator = Operator::new(Memory::default()).map_err(|e| {
                    lance_core::Error::invalid_input(format!(
                        "Failed to create memory operator: {e:?}"
                    ))
                })?;
                Ok(OpendalStore::new(operator.finish()))
            },
        );

        let first = store
            .current_store()
            .await
            .expect("first store should build");
        let second = store
            .current_store()
            .await
            .expect("second store should reuse cache");

        assert!(Arc::ptr_eq(&first, &second));
    }

    #[test]
    fn test_merge_options_preserves_protected_base_keys() {
        let accessor = Arc::new(StorageOptionsAccessor::with_provider(Arc::new(
            StaticMockStorageOptionsProvider {
                options: HashMap::new(),
            },
        )));
        let store = DynamicOpenDalStore::new(
            "memory",
            HashMap::from([
                ("bucket".to_string(), "url-bucket".to_string()),
                ("root".to_string(), "/".to_string()),
                ("token".to_string(), "base-token".to_string()),
            ]),
            accessor,
            |options| Ok(options.clone()),
            |_| {
                let operator = Operator::new(Memory::default()).map_err(|e| {
                    lance_core::Error::invalid_input(format!(
                        "Failed to create memory operator: {e:?}"
                    ))
                })?;
                Ok(OpendalStore::new(operator.finish()))
            },
        )
        .with_protected_keys(["bucket", "root"]);

        let merged = store.merge_options(HashMap::from([
            ("bucket".to_string(), "provider-bucket".to_string()),
            ("root".to_string(), "/provider-root".to_string()),
            ("token".to_string(), "provider-token".to_string()),
        ]));

        assert_eq!(merged.get("bucket").unwrap(), "url-bucket");
        assert_eq!(merged.get("root").unwrap(), "/");
        assert_eq!(merged.get("token").unwrap(), "provider-token");
    }
}