lance_hdfs_provider/lib.rs
1#[doc = include_str!("../README.md")]
2use std::sync::Arc;
3
4use lance_core::Error;
5use object_store::path::Path;
6use object_store_opendal::OpendalStore;
7use opendal::{Operator, services::HdfsNative};
8use snafu::location;
9
10use lance_io::object_store::{
11 DEFAULT_CLOUD_IO_PARALLELISM, ObjectStore, ObjectStoreParams, ObjectStoreProvider,
12 StorageOptions,
13};
14
15/// HDFS Object Store Provider for Lance.
16///
17/// # Example
18///
19/// ## With lance
20/// ```rust,no_run
21/// # use std::sync::Arc;
22/// # use lance::{io::ObjectStoreRegistry, session::Session,
23/// # dataset::{DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE}}
24/// # ;
25/// # use lance_hdfs_provider::HdfsStoreProvider;
26/// # use lance::dataset::builder::DatasetBuilder;
27///
28/// # #[tokio::main(flavor = "current_thread")]
29/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
30/// let mut registry = ObjectStoreRegistry::default();
31/// registry.insert("hdfs", Arc::new(HdfsStoreProvider));
32///
33/// let session = Arc::new(Session::new(
34/// DEFAULT_INDEX_CACHE_SIZE,
35/// DEFAULT_METADATA_CACHE_SIZE,
36/// Arc::new(registry),
37/// ));
38///
39/// let uri = "hdfs://127.0.0.1:9000/sample-dataset";
40/// let _ds = DatasetBuilder::from_uri(uri).with_session(session).load().await?;
41/// # Ok(())
42/// # }
43/// ```
44/// ## With lancedb
45/// ```rust,no_run
46/// # use std::sync::Arc;
47/// # use lance::{io::ObjectStoreRegistry, session::Session,
48/// # dataset::{DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE}
49/// # };
50/// # use lance_hdfs_provider::HdfsStoreProvider;
51///
52/// # #[tokio::main(flavor = "current_thread")]
53/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
54/// let mut registry = ObjectStoreRegistry::default();
55/// registry.insert("hdfs", Arc::new(HdfsStoreProvider));
56///
57/// let session = Arc::new(Session::new(
58/// DEFAULT_INDEX_CACHE_SIZE,
59/// DEFAULT_METADATA_CACHE_SIZE,
60/// Arc::new(registry),
61/// ));
62///
63/// let db = lancedb::connect("hdfs://127.0.0.1:9000/test-db")
64/// .session(session.clone())
65/// .execute()
66/// .await?;
67///
68/// let table = db.open_table("table1").execute().await?;
69/// Ok(())
70/// # }
71/// ```
72#[derive(Debug, Clone)]
73pub struct HdfsStoreProvider;
74
75#[async_trait::async_trait]
76impl ObjectStoreProvider for HdfsStoreProvider {
77 async fn new_store(
78 &self,
79 base_path: url::Url,
80 params: &ObjectStoreParams,
81 ) -> Result<ObjectStore, lance_core::Error> {
82 let mut storage_options =
83 StorageOptions(params.storage_options().cloned().unwrap_or_default());
84
85 let download_retry_count = storage_options.download_retry_count();
86 storage_options.0.insert(
87 "name_node".to_string(),
88 format!("{}://{}", base_path.scheme(), base_path.authority()),
89 );
90
91 let operator = Operator::from_iter::<HdfsNative>(storage_options.0.into_iter())
92 .map_err(|e| {
93 Error::invalid_input(
94 format!("Failed to create HDFS native operator: {:?}", e),
95 location!(),
96 )
97 })?
98 .finish();
99
100 let opendal_store = Arc::new(OpendalStore::new(operator));
101 Ok(ObjectStore::new(
102 opendal_store,
103 base_path,
104 params.block_size,
105 None,
106 params.use_constant_size_upload_parts,
107 true,
108 DEFAULT_CLOUD_IO_PARALLELISM,
109 download_retry_count,
110 params.storage_options(),
111 ))
112 }
113
114 fn extract_path(&self, url: &url::Url) -> Result<Path, Error> {
115 if let Ok(file_path) = url.to_file_path() {
116 if let Ok(path) = Path::from_absolute_path(&file_path) {
117 return Ok(path);
118 }
119 }
120
121 Path::parse(url.path()).map_err(|e| {
122 Error::invalid_input(
123 format!("Failed to parse path '{}': {}", url.path(), e),
124 location!(),
125 )
126 })
127 }
128}
129#[cfg(test)]
130mod tests {
131 use url::Url;
132
133 use super::*;
134
135 #[test]
136 fn test_hdfs_store_path() {
137 let provider = HdfsStoreProvider;
138
139 let url = Url::parse("hdfs://hdfs-server/path/to/file").unwrap();
140 let path = provider.extract_path(&url).unwrap();
141 let expected_path = Path::from("/path/to/file");
142 assert_eq!(path, expected_path);
143 }
144}