deltalake_hdfs/
lib.rs

1use std::sync::Arc;
2
3use deltalake_core::logstore::{
4    LogStore, LogStoreFactory, StorageConfig, default_logstore, logstore_factories,
5};
6use deltalake_core::logstore::{ObjectStoreFactory, ObjectStoreRef, object_store_factories};
7use deltalake_core::{DeltaResult, Path};
8use hdfs_native_object_store::HdfsObjectStoreBuilder;
9use url::Url;
10
11#[derive(Clone, Default, Debug)]
12pub struct HdfsFactory {}
13
14impl ObjectStoreFactory for HdfsFactory {
15    fn parse_url_opts(
16        &self,
17        url: &Url,
18        config: &StorageConfig,
19    ) -> DeltaResult<(ObjectStoreRef, Path)> {
20        let mut builder = HdfsObjectStoreBuilder::new()
21            .with_url(url.as_str())
22            .with_config(&config.raw);
23
24        if let Some(runtime) = &config.runtime {
25            builder = builder.with_io_runtime(runtime.get_handle());
26        }
27
28        let store = Arc::new(builder.build()?);
29
30        let prefix = Path::parse(url.path())?;
31        Ok((store, prefix))
32    }
33}
34
35impl LogStoreFactory for HdfsFactory {
36    fn with_options(
37        &self,
38        prefixed_store: ObjectStoreRef,
39        root_store: ObjectStoreRef,
40        location: &Url,
41        options: &StorageConfig,
42    ) -> DeltaResult<Arc<dyn LogStore>> {
43        Ok(default_logstore(
44            prefixed_store,
45            root_store,
46            location,
47            options,
48        ))
49    }
50}
51
52/// Register an [ObjectStoreFactory] for common HDFS [Url] schemes
53pub fn register_handlers(_additional_prefixes: Option<Url>) {
54    let factory = Arc::new(HdfsFactory {});
55    for scheme in ["hdfs", "viewfs"].iter() {
56        let url = Url::parse(&format!("{scheme}://")).unwrap();
57        object_store_factories().insert(url.clone(), factory.clone());
58        logstore_factories().insert(url.clone(), factory.clone());
59    }
60}
61
62#[cfg(test)]
63mod tests {
64    use super::*;
65
66    #[test]
67    fn test_parse_url_opts() -> DeltaResult<()> {
68        let factory = HdfsFactory::default();
69        let _ = factory.parse_url_opts(
70            &Url::parse("hdfs://localhost:9000").expect("Failed to parse hdfs://"),
71            &StorageConfig::default(),
72        )?;
73        Ok(())
74    }
75}