Skip to main content

lance_hdfs_provider/
lib.rs

1#[doc = include_str!("../README.md")]
2use std::sync::Arc;
3
4use lance_core::Error;
5use object_store::path::Path;
6use object_store_opendal::OpendalStore;
7use opendal::{Operator, services::HdfsNative};
8use snafu::location;
9
10use lance_io::object_store::{
11    DEFAULT_CLOUD_IO_PARALLELISM, ObjectStore, ObjectStoreParams, ObjectStoreProvider,
12    StorageOptions,
13};
14
15/// HDFS Object Store Provider for Lance.
16///
17/// # Example
18///
19/// ## With lance
20/// ```rust,no_run
21/// # use std::sync::Arc;
22/// # use lance::{io::ObjectStoreRegistry, session::Session,
23/// #     dataset::{DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE}}
24/// # ;
25/// # use lance_hdfs_provider::HdfsStoreProvider;
26/// # use lance::dataset::builder::DatasetBuilder;
27///
28/// # #[tokio::main(flavor = "current_thread")]
29/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
30/// let mut registry = ObjectStoreRegistry::default();
31/// registry.insert("hdfs", Arc::new(HdfsStoreProvider));
32///
33/// let session = Arc::new(Session::new(
34///     DEFAULT_INDEX_CACHE_SIZE,
35///     DEFAULT_METADATA_CACHE_SIZE,
36///     Arc::new(registry),
37/// ));
38///
39/// let uri = "hdfs://127.0.0.1:9000/sample-dataset";
40/// let _ds = DatasetBuilder::from_uri(uri).with_session(session).load().await?;
41/// # Ok(())
42/// # }
43/// ```
44/// ## With lancedb
45/// ```rust,no_run
46/// # use std::sync::Arc;
47/// # use lance::{io::ObjectStoreRegistry, session::Session,
48/// #     dataset::{DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE}
49/// # };
50/// # use lance_hdfs_provider::HdfsStoreProvider;
51///
52/// # #[tokio::main(flavor = "current_thread")]
53/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
54///     let mut registry = ObjectStoreRegistry::default();
55///     registry.insert("hdfs", Arc::new(HdfsStoreProvider));
56///
57///     let session = Arc::new(Session::new(
58///         DEFAULT_INDEX_CACHE_SIZE,
59///         DEFAULT_METADATA_CACHE_SIZE,
60///         Arc::new(registry),
61///     ));
62///
63///     let db = lancedb::connect("hdfs://127.0.0.1:9000/test-db")
64///         .session(session.clone())
65///         .execute()
66///         .await?;
67///
68///     let table = db.open_table("table1").execute().await?;
69///     Ok(())
70/// # }
71/// ```
72#[derive(Debug, Clone)]
73pub struct HdfsStoreProvider;
74
75#[async_trait::async_trait]
76impl ObjectStoreProvider for HdfsStoreProvider {
77    async fn new_store(
78        &self,
79        base_path: url::Url,
80        params: &ObjectStoreParams,
81    ) -> Result<ObjectStore, lance_core::Error> {
82        let mut storage_options =
83            StorageOptions(params.storage_options().cloned().unwrap_or_default());
84
85        let download_retry_count = storage_options.download_retry_count();
86        storage_options.0.insert(
87            "name_node".to_string(),
88            format!("{}://{}", base_path.scheme(), base_path.authority()),
89        );
90
91        let operator = Operator::from_iter::<HdfsNative>(storage_options.0.into_iter())
92            .map_err(|e| {
93                Error::invalid_input(
94                    format!("Failed to create HDFS native operator: {:?}", e),
95                    location!(),
96                )
97            })?
98            .finish();
99
100        let opendal_store = Arc::new(OpendalStore::new(operator));
101        Ok(ObjectStore::new(
102            opendal_store,
103            base_path,
104            params.block_size,
105            None,
106            params.use_constant_size_upload_parts,
107            true,
108            DEFAULT_CLOUD_IO_PARALLELISM,
109            download_retry_count,
110            params.storage_options(),
111        ))
112    }
113
114    fn extract_path(&self, url: &url::Url) -> Result<Path, Error> {
115        if let Ok(file_path) = url.to_file_path() {
116            if let Ok(path) = Path::from_absolute_path(&file_path) {
117                return Ok(path);
118            }
119        }
120
121        Path::parse(url.path()).map_err(|e| {
122            Error::invalid_input(
123                format!("Failed to parse path '{}': {}", url.path(), e),
124                location!(),
125            )
126        })
127    }
128}
129#[cfg(test)]
130mod tests {
131    use url::Url;
132
133    use super::*;
134
135    #[test]
136    fn test_hdfs_store_path() {
137        let provider = HdfsStoreProvider;
138
139        let url = Url::parse("hdfs://hdfs-server/path/to/file").unwrap();
140        let path = provider.extract_path(&url).unwrap();
141        let expected_path = Path::from("/path/to/file");
142        assert_eq!(path, expected_path);
143    }
144}