hdfs_native_object_store/
lib.rs

1//! [object_store::ObjectStore] implementation for the Native Rust HDFS client
2//!
3//! # Usage
4//!
5//! ```rust
6//! use hdfs_native_object_store::HdfsObjectStoreBuilder;
7//! let store = HdfsObjectStoreBuilder::new()
8//!     .with_url("hdfs://localhost:9000")
9//!     .build()
10//!     .unwrap();
11//! ```
12//!
13use std::{
14    collections::HashMap,
15    fmt::{Display, Formatter},
16    future,
17    path::PathBuf,
18};
19
20use async_trait::async_trait;
21use chrono::{DateTime, Utc};
22use futures::{
23    stream::{BoxStream, StreamExt},
24    FutureExt,
25};
26use hdfs_native::{
27    client::FileStatus, file::FileWriter, Client, ClientBuilder, HdfsError, WriteOptions,
28};
29#[allow(deprecated)]
30use object_store::{
31    path::Path, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
32    ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
33};
34use tokio::{
35    runtime::Handle,
36    sync::{mpsc, oneshot},
37    task::{self, JoinHandle},
38};
39
40// Re-export minidfs for down-stream integration tests
41#[cfg(feature = "integration-test")]
42pub use hdfs_native::minidfs;
43
44fn generic_error(
45    source: Box<dyn std::error::Error + Send + Sync + 'static>,
46) -> object_store::Error {
47    object_store::Error::Generic {
48        store: "HFDS",
49        source,
50    }
51}
52
53/// Builder for creating an [HdfsObjectStore]
54#[derive(Default)]
55pub struct HdfsObjectStoreBuilder {
56    inner: ClientBuilder,
57}
58
59impl HdfsObjectStoreBuilder {
60    /// Create a new [HdfsObjectStoreBuilder]
61    pub fn new() -> Self {
62        Self::default()
63    }
64
65    /// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService
66    pub fn with_url(mut self, url: impl Into<String>) -> Self {
67        self.inner = self.inner.with_url(url);
68        self
69    }
70
71    /// Set configs to use for the client. The provided configs will override any found in the default config files loaded
72    pub fn with_config(
73        mut self,
74        config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
75    ) -> Self {
76        self.inner = self.inner.with_config(config);
77        self
78    }
79
80    // Use a dedicated tokio runtime for spawned tasks and IO operations
81    pub fn with_io_runtime(mut self, runtime: Handle) -> Self {
82        self.inner = self.inner.with_io_runtime(runtime);
83        self
84    }
85
86    /// Create the [HdfsObjectStore]] instance from the provided settings
87    pub fn build(self) -> Result<HdfsObjectStore> {
88        let client = self.inner.build().to_object_store_err()?;
89
90        Ok(HdfsObjectStore { client })
91    }
92}
93
94/// Interface for [Hadoop Distributed File System](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html).
95#[derive(Debug, Clone)]
96pub struct HdfsObjectStore {
97    client: Client,
98}
99
100impl HdfsObjectStore {
101    /// Creates a new HdfsObjectStore from an existing `hdfs-native` [Client]
102    ///
103    /// ```rust
104    /// # use std::sync::Arc;
105    /// use hdfs_native::ClientBuilder;
106    /// # use hdfs_native_object_store::HdfsObjectStore;
107    /// let client = ClientBuilder::new().with_url("hdfs://127.0.0.1:9000").build().unwrap();
108    /// let store = HdfsObjectStore::new(client);
109    /// ```
110    pub fn new(client: Client) -> Self {
111        Self { client }
112    }
113
114    /// Creates a new HdfsObjectStore using the specified URL
115    ///
116    /// Connect to a NameNode
117    /// ```rust
118    /// # use hdfs_native_object_store::HdfsObjectStore;
119    /// # fn main() -> object_store::Result<()> {
120    /// let store = HdfsObjectStore::with_url("hdfs://127.0.0.1:9000")?;
121    /// # Ok(())
122    /// # }
123    /// ```
124    #[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
125    pub fn with_url(url: &str) -> Result<Self> {
126        let client = ClientBuilder::new()
127            .with_url(url)
128            .build()
129            .to_object_store_err()?;
130
131        Ok(Self { client })
132    }
133
134    /// Creates a new HdfsObjectStore using the specified URL and Hadoop configs.
135    ///
136    /// Connect to a NameService
137    /// ```rust
138    /// # use hdfs_native_object_store::HdfsObjectStore;
139    /// # use std::collections::HashMap;
140    /// # fn main() -> object_store::Result<()> {
141    /// let config = HashMap::from([
142    ///     ("dfs.ha.namenodes.ns".to_string(), "nn1,nn2".to_string()),
143    ///     ("dfs.namenode.rpc-address.ns.nn1".to_string(), "nn1.example.com:9000".to_string()),
144    ///     ("dfs.namenode.rpc-address.ns.nn2".to_string(), "nn2.example.com:9000".to_string()),
145    /// ]);
146    /// let store = HdfsObjectStore::with_config("hdfs://ns", config)?;
147    /// # Ok(())
148    /// # }
149    /// ```
150    #[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
151    pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
152        let client = ClientBuilder::new()
153            .with_url(url)
154            .with_config(config)
155            .build()
156            .to_object_store_err()?;
157
158        Ok(Self { client })
159    }
160
161    async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
162        let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await {
163            Ok(_) if overwrite => true,
164            Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to))).to_object_store_err()?,
165            Err(HdfsError::FileNotFound(_)) => false,
166            Err(e) => Err(e).to_object_store_err()?,
167        };
168
169        let write_options = WriteOptions {
170            overwrite,
171            ..Default::default()
172        };
173
174        let file = self
175            .client
176            .read(&make_absolute_file(from))
177            .await
178            .to_object_store_err()?;
179        let mut stream = file.read_range_stream(0, file.file_length()).boxed();
180
181        let mut new_file = self
182            .client
183            .create(&make_absolute_file(to), write_options)
184            .await
185            .to_object_store_err()?;
186
187        while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
188            new_file.write(bytes).await.to_object_store_err()?;
189        }
190        new_file.close().await.to_object_store_err()?;
191
192        Ok(())
193    }
194
195    async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
196        let path_buf = PathBuf::from(file_path);
197
198        let file_name = path_buf
199            .file_name()
200            .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
201            .to_object_store_err()?
202            .to_str()
203            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
204            .to_object_store_err()?
205            .to_string();
206
207        let tmp_file_path = path_buf
208            .with_file_name(format!(".{file_name}.tmp"))
209            .to_str()
210            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
211            .to_object_store_err()?
212            .to_string();
213
214        // Try to create a file with an incrementing index until we find one that doesn't exist yet
215        let mut index = 1;
216        loop {
217            let path = format!("{tmp_file_path}.{index}");
218            match self.client.create(&path, WriteOptions::default()).await {
219                Ok(writer) => break Ok((writer, path)),
220                Err(HdfsError::AlreadyExists(_)) => index += 1,
221                Err(e) => break Err(e).to_object_store_err(),
222            }
223        }
224    }
225}
226
227impl Display for HdfsObjectStore {
228    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
229        write!(f, "HdfsObjectStore")
230    }
231}
232
233impl From<Client> for HdfsObjectStore {
234    fn from(value: Client) -> Self {
235        Self { client: value }
236    }
237}
238
239#[async_trait]
240impl ObjectStore for HdfsObjectStore {
241    /// Save the provided bytes to the specified location
242    ///
243    /// To make the operation atomic, we write to a temporary file `.{filename}.tmp.{i}` and rename
244    /// on a successful write, where `i` is an integer that is incremented until a non-existent file
245    /// is found.
246    async fn put_opts(
247        &self,
248        location: &Path,
249        payload: PutPayload,
250        opts: PutOptions,
251    ) -> Result<PutResult> {
252        let overwrite = match opts.mode {
253            PutMode::Create => false,
254            PutMode::Overwrite => true,
255            PutMode::Update(_) => {
256                return Err(object_store::Error::NotImplemented);
257            }
258        };
259
260        let final_file_path = make_absolute_file(location);
261
262        // If we're not overwriting, do an upfront check to see if the file already
263        // exists. Otherwise we have to write the whole file and try to rename before
264        // finding out.
265        if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
266            return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
267        }
268
269        let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
270
271        for bytes in payload {
272            tmp_file.write(bytes).await.to_object_store_err()?;
273        }
274        tmp_file.close().await.to_object_store_err()?;
275
276        self.client
277            .rename(&tmp_file_path, &final_file_path, overwrite)
278            .await
279            .to_object_store_err()?;
280
281        let e_tag = self.head(location).await?.e_tag;
282
283        Ok(PutResult {
284            e_tag,
285            version: None,
286        })
287    }
288
289    /// Create a multipart writer that writes to a temporary file in a background task, and renames
290    /// to the final destination on complete.
291    #[allow(deprecated)]
292    async fn put_multipart_opts(
293        &self,
294        location: &Path,
295        _opts: PutMultipartOpts,
296    ) -> Result<Box<dyn MultipartUpload>> {
297        let final_file_path = make_absolute_file(location);
298
299        let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
300
301        Ok(Box::new(HdfsMultipartWriter::new(
302            self.client.clone(),
303            tmp_file,
304            &tmp_file_path,
305            &final_file_path,
306        )))
307    }
308
309    /// Reads data for the specified location.
310    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
311        let meta = self.head(location).await?;
312
313        options.check_preconditions(&meta)?;
314
315        let range = options
316            .range
317            .map(|r| r.as_range(meta.size))
318            .transpose()
319            .map_err(|source| generic_error(source.into()))?
320            .unwrap_or(0..meta.size);
321
322        let reader = self
323            .client
324            .read(&make_absolute_file(location))
325            .await
326            .to_object_store_err()?;
327        let start: usize = range
328            .start
329            .try_into()
330            .expect("unable to convert range.start to usize");
331        let end: usize = range
332            .end
333            .try_into()
334            .expect("unable to convert range.end to usize");
335        let stream = reader
336            .read_range_stream(start, end - start)
337            .map(|b| b.to_object_store_err())
338            .boxed();
339
340        let payload = GetResultPayload::Stream(stream);
341
342        Ok(GetResult {
343            payload,
344            meta,
345            range,
346            attributes: Default::default(),
347        })
348    }
349
350    /// Return the metadata for the specified location
351    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
352        let status = self
353            .client
354            .get_file_info(&make_absolute_file(location))
355            .await
356            .to_object_store_err()?;
357
358        if status.isdir {
359            return Err(object_store::Error::NotFound {
360                path: location.to_string(),
361                source: "Head cannot be called on a directory".into(),
362            });
363        }
364
365        get_object_meta(&status)
366    }
367
368    /// Delete the object at the specified location.
369    async fn delete(&self, location: &Path) -> Result<()> {
370        let result = self
371            .client
372            .delete(&make_absolute_file(location), false)
373            .await
374            .to_object_store_err()?;
375
376        if !result {
377            Err(HdfsError::FileNotFound(location.to_string())).to_object_store_err()?
378        }
379
380        Ok(())
381    }
382
383    /// List all the objects with the given prefix.
384    ///
385    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
386    /// `foo/bar_baz/x`.
387    ///
388    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
389    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
390        let absolute_dir = prefix.map(make_absolute_file).unwrap_or("/".to_string());
391
392        let status_stream = self
393            .client
394            .list_status_iter(&absolute_dir, true)
395            .into_stream()
396            .filter(move |res| {
397                let result = match res {
398                    // Directories aren't a thing in object stores so ignore them, and if a file is listed
399                    // directly that should be ignored as well
400                    Ok(status) => !status.isdir && status.path != absolute_dir,
401                    // Listing by prefix should just return an empty list if the prefix isn't found
402                    Err(HdfsError::FileNotFound(_)) => false,
403                    _ => true,
404                };
405                future::ready(result)
406            })
407            .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
408
409        Box::pin(status_stream)
410    }
411
412    /// List objects with the given prefix and an implementation specific
413    /// delimiter. Returns common prefixes (directories) in addition to object
414    /// metadata.
415    ///
416    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
417    /// `foo/bar_baz/x`.
418    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
419        let absolute_dir = prefix.map(make_absolute_file).unwrap_or("/".to_string());
420
421        let mut status_stream = self
422            .client
423            .list_status_iter(&absolute_dir, false)
424            .into_stream()
425            .filter(move |res| {
426                let result = match res {
427                    // If a file is listed directly it should be ignored
428                    Ok(status) => status.path != absolute_dir,
429                    // Listing by prefix should just return an empty list if the prefix isn't found
430                    Err(HdfsError::FileNotFound(_)) => false,
431                    _ => true,
432                };
433                future::ready(result)
434            });
435
436        let mut statuses = Vec::<FileStatus>::new();
437        while let Some(status) = status_stream.next().await {
438            statuses.push(status.to_object_store_err()?);
439        }
440
441        let mut dirs: Vec<Path> = Vec::new();
442        for status in statuses.iter().filter(|s| s.isdir) {
443            dirs.push(Path::parse(&status.path)?)
444        }
445
446        let mut files: Vec<ObjectMeta> = Vec::new();
447        for status in statuses.iter().filter(|s| !s.isdir) {
448            files.push(get_object_meta(status)?)
449        }
450
451        Ok(ListResult {
452            common_prefixes: dirs,
453            objects: files,
454        })
455    }
456
457    /// Renames a file. This operation is guaranteed to be atomic.
458    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
459        // Make sure the parent directory exists
460        let mut parent: Vec<_> = to.parts().collect();
461        parent.pop();
462
463        if !parent.is_empty() {
464            let parent_path: Path = parent.into_iter().collect();
465            self.client
466                .mkdirs(&make_absolute_dir(&parent_path), 0o755, true)
467                .await
468                .to_object_store_err()?;
469        }
470
471        Ok(self
472            .client
473            .rename(&make_absolute_file(from), &make_absolute_file(to), true)
474            .await
475            .to_object_store_err()?)
476    }
477
478    /// Renames a file only if the distination doesn't exist. This operation is guaranteed
479    /// to be atomic.
480    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
481        self.client
482            .rename(&make_absolute_file(from), &make_absolute_file(to), false)
483            .await
484            .to_object_store_err()
485    }
486
487    /// Copy an object from one path to another in the same object store.
488    ///
489    /// If there exists an object at the destination, it will be overwritten.
490    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
491        self.internal_copy(from, to, true).await
492    }
493
494    /// Copy an object from one path to another, only if destination is empty.
495    ///
496    /// Will return an error if the destination already has an object.
497    ///
498    /// Performs an atomic operation if the underlying object storage supports it.
499    /// If atomic operations are not supported by the underlying object storage (like S3)
500    /// it will return an error.
501    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
502        self.internal_copy(from, to, false).await
503    }
504}
505
506trait HdfsErrorConvert<T> {
507    fn to_object_store_err(self) -> Result<T>;
508}
509
510impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
511    fn to_object_store_err(self) -> Result<T> {
512        self.map_err(|err| match err {
513            HdfsError::FileNotFound(path) => object_store::Error::NotFound {
514                path: path.clone(),
515                source: Box::new(HdfsError::FileNotFound(path)),
516            },
517            HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
518                path: path.clone(),
519                source: Box::new(HdfsError::AlreadyExists(path)),
520            },
521            _ => object_store::Error::Generic {
522                store: "HdfsObjectStore",
523                source: Box::new(err),
524            },
525        })
526    }
527}
528
529type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
530
531// Create a fake multipart writer the creates an uploader to a temp file as a background
532// task, and submits new parts to be uploaded to a queue for this task.
533// A once cell is used to track whether a part has finished writing or not.
534// On completing, rename the file to the actual target.
535struct HdfsMultipartWriter {
536    client: Client,
537    sender: Option<(JoinHandle<Result<()>>, PartSender)>,
538    tmp_filename: String,
539    final_filename: String,
540}
541
542impl HdfsMultipartWriter {
543    fn new(client: Client, writer: FileWriter, tmp_filename: &str, final_filename: &str) -> Self {
544        let (sender, receiver) = mpsc::unbounded_channel();
545
546        Self {
547            client,
548            sender: Some((Self::start_writer_task(writer, receiver), sender)),
549            tmp_filename: tmp_filename.to_string(),
550            final_filename: final_filename.to_string(),
551        }
552    }
553
554    fn start_writer_task(
555        mut writer: FileWriter,
556        mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
557    ) -> JoinHandle<Result<()>> {
558        task::spawn(async move {
559            loop {
560                match part_receiver.recv().await {
561                    Some((sender, part)) => {
562                        for bytes in part {
563                            if let Err(e) = writer.write(bytes).await {
564                                let _ = sender.send(Err(e).to_object_store_err());
565                                return Err(generic_error("Failed to write all parts".into()));
566                            }
567                        }
568                        let _ = sender.send(Ok(()));
569                    }
570                    None => return writer.close().await.to_object_store_err(),
571                }
572            }
573        })
574    }
575}
576
577impl std::fmt::Debug for HdfsMultipartWriter {
578    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579        f.debug_struct("HdfsMultipartWriter")
580            .field("tmp_filename", &self.tmp_filename)
581            .field("final_filename", &self.final_filename)
582            .finish()
583    }
584}
585
586#[async_trait]
587impl MultipartUpload for HdfsMultipartWriter {
588    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
589        let (result_sender, result_receiver) = oneshot::channel();
590
591        if let Some((_, payload_sender)) = self.sender.as_ref() {
592            if let Err(mpsc::error::SendError((result_sender, _))) =
593                payload_sender.send((result_sender, payload))
594            {
595                let _ = result_sender.send(Err(generic_error("Write task failed".into())));
596            }
597        } else {
598            let _ = result_sender.send(Err(generic_error(
599                "Cannot put part after completing or aborting".into(),
600            )));
601        }
602
603        async {
604            result_receiver
605                .await
606                .unwrap_or_else(|_| Err(generic_error("Write task failed".into())))
607        }
608        .boxed()
609    }
610
611    async fn complete(&mut self) -> Result<PutResult> {
612        // Drop the sender so the task knows no more data is coming
613        if let Some((handle, sender)) = self.sender.take() {
614            drop(sender);
615
616            // Wait for the writer task to finish
617            handle.await??;
618
619            self.client
620                .rename(&self.tmp_filename, &self.final_filename, true)
621                .await
622                .to_object_store_err()?;
623
624            Ok(PutResult {
625                e_tag: None,
626                version: None,
627            })
628        } else {
629            Err(generic_error(
630                "Cannot call abort or complete multiple times".into(),
631            ))
632        }
633    }
634
635    async fn abort(&mut self) -> Result<()> {
636        // Drop the sender so the task knows no more data is coming
637        if let Some((handle, sender)) = self.sender.take() {
638            drop(sender);
639
640            // Wait for the writer task to finish
641            handle.abort();
642
643            self.client
644                .delete(&self.tmp_filename, false)
645                .await
646                .to_object_store_err()?;
647
648            Ok(())
649        } else {
650            Err(generic_error(
651                "Cannot call abort or complete multiple times".into(),
652            ))
653        }
654    }
655}
656
657/// ObjectStore paths always remove the leading slash, so add it back
658fn make_absolute_file(path: &Path) -> String {
659    format!("/{}", path.as_ref())
660}
661
662fn make_absolute_dir(path: &Path) -> String {
663    if path.parts().count() > 0 {
664        format!("/{}/", path.as_ref())
665    } else {
666        "/".to_string()
667    }
668}
669
670fn get_etag(status: &FileStatus) -> String {
671    let size = status.length;
672    let mtime = status.modification_time;
673
674    // Use an ETag scheme based on that used by many popular HTTP servers
675    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
676    // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
677    format!("{mtime:x}-{size:x}")
678}
679
680fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
681    Ok(ObjectMeta {
682        location: Path::parse(&status.path)?,
683        last_modified: DateTime::<Utc>::from_timestamp_millis(status.modification_time as i64)
684            .ok_or(generic_error(
685                "Last modified timestamp out of bounds".into(),
686            ))?,
687        size: status.length as u64,
688        e_tag: Some(get_etag(status)),
689        version: None,
690    })
691}
692
693#[cfg(test)]
694#[cfg(feature = "integration-test")]
695mod test {
696    use std::collections::HashSet;
697
698    use object_store::integration::*;
699    use serial_test::serial;
700    use tokio::runtime::Runtime;
701
702    use crate::HdfsObjectStoreBuilder;
703
704    #[tokio::test]
705    #[serial]
706    async fn hdfs_test() {
707        let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
708            hdfs_native::minidfs::DfsFeatures::HA,
709        ]));
710
711        let integration = HdfsObjectStoreBuilder::new()
712            .with_url(&dfs.url)
713            .build()
714            .unwrap();
715
716        put_get_delete_list(&integration).await;
717        list_uses_directories_correctly(&integration).await;
718        list_with_delimiter(&integration).await;
719        rename_and_copy(&integration).await;
720        copy_if_not_exists(&integration).await;
721        multipart_race_condition(&integration, true).await;
722        multipart_out_of_order(&integration).await;
723        get_opts(&integration).await;
724        put_opts(&integration, false).await;
725    }
726
727    #[test]
728    #[serial]
729    fn test_no_tokio() {
730        let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
731            hdfs_native::minidfs::DfsFeatures::HA,
732        ]));
733
734        let integration = HdfsObjectStoreBuilder::new()
735            .with_url(&dfs.url)
736            .build()
737            .unwrap();
738
739        futures::executor::block_on(get_opts(&integration));
740
741        let rt = Runtime::new().unwrap();
742
743        let integration = HdfsObjectStoreBuilder::new()
744            .with_url(&dfs.url)
745            .with_io_runtime(rt.handle().clone())
746            .build()
747            .unwrap();
748
749        futures::executor::block_on(get_opts(&integration));
750    }
751}