1use std::sync::Arc;
2
3use deltalake_core::logstore::{
4 LogStore, LogStoreFactory, StorageConfig, default_logstore, logstore_factories,
5};
6use deltalake_core::logstore::{ObjectStoreFactory, ObjectStoreRef, object_store_factories};
7use deltalake_core::{DeltaResult, Path};
8use hdfs_native_object_store::HdfsObjectStoreBuilder;
9use url::Url;
10
11#[derive(Clone, Default, Debug)]
12pub struct HdfsFactory {}
13
14impl ObjectStoreFactory for HdfsFactory {
15 fn parse_url_opts(
16 &self,
17 url: &Url,
18 config: &StorageConfig,
19 ) -> DeltaResult<(ObjectStoreRef, Path)> {
20 let mut builder = HdfsObjectStoreBuilder::new()
21 .with_url(url.as_str())
22 .with_config(&config.raw);
23
24 if let Some(runtime) = &config.runtime {
25 builder = builder.with_io_runtime(runtime.get_handle());
26 }
27
28 let store = Arc::new(builder.build()?);
29
30 let prefix = Path::parse(url.path())?;
31 Ok((store, prefix))
32 }
33}
34
35impl LogStoreFactory for HdfsFactory {
36 fn with_options(
37 &self,
38 prefixed_store: ObjectStoreRef,
39 root_store: ObjectStoreRef,
40 location: &Url,
41 options: &StorageConfig,
42 ) -> DeltaResult<Arc<dyn LogStore>> {
43 Ok(default_logstore(
44 prefixed_store,
45 root_store,
46 location,
47 options,
48 ))
49 }
50}
51
52pub fn register_handlers(_additional_prefixes: Option<Url>) {
54 let factory = Arc::new(HdfsFactory {});
55 for scheme in ["hdfs", "viewfs"].iter() {
56 let url = Url::parse(&format!("{scheme}://")).unwrap();
57 object_store_factories().insert(url.clone(), factory.clone());
58 logstore_factories().insert(url.clone(), factory.clone());
59 }
60}
61
62#[cfg(test)]
63mod tests {
64 use super::*;
65
66 #[test]
67 fn test_parse_url_opts() -> DeltaResult<()> {
68 let factory = HdfsFactory::default();
69 let _ = factory.parse_url_opts(
70 &Url::parse("hdfs://localhost:9000").expect("Failed to parse hdfs://"),
71 &StorageConfig::default(),
72 )?;
73 Ok(())
74 }
75}