hdfs_native_object_store/
lib.rs

1//! [object_store::ObjectStore] implementation for the Native Rust HDFS client
2//!
3//! # Usage
4//!
5//! ```rust
6//! use hdfs_native_object_store::HdfsObjectStoreBuilder;
7//! let store = HdfsObjectStoreBuilder::new()
8//!     .with_url("hdfs://localhost:9000")
9//!     .build()
10//!     .unwrap();
11//! ```
12//!
13use std::{
14    collections::HashMap,
15    fmt::{Display, Formatter},
16    future,
17    path::PathBuf,
18};
19
20use async_trait::async_trait;
21use chrono::{DateTime, Utc};
22use futures::{
23    FutureExt,
24    stream::{BoxStream, StreamExt},
25};
26use hdfs_native::{
27    Client, ClientBuilder, HdfsError, WriteOptions, client::FileStatus, file::FileWriter,
28};
29use object_store::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
30#[allow(deprecated)]
31use object_store::{
32    GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
33    PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, path::Path,
34};
35use tokio::{
36    runtime::Handle,
37    sync::{mpsc, oneshot},
38    task::{self, JoinHandle},
39};
40
41// Re-export minidfs for down-stream integration tests
42#[cfg(feature = "integration-test")]
43pub use hdfs_native::minidfs;
44
45fn generic_error(
46    source: Box<dyn std::error::Error + Send + Sync + 'static>,
47) -> object_store::Error {
48    object_store::Error::Generic {
49        store: "HFDS",
50        source,
51    }
52}
53
54/// Builder for creating an [HdfsObjectStore]
55#[derive(Default)]
56pub struct HdfsObjectStoreBuilder {
57    inner: ClientBuilder,
58}
59
60impl HdfsObjectStoreBuilder {
61    /// Create a new [HdfsObjectStoreBuilder]
62    pub fn new() -> Self {
63        Self::default()
64    }
65
66    /// Set the URL to connect to. Can be the address of a single NameNode, or a logical NameService
67    pub fn with_url(mut self, url: impl Into<String>) -> Self {
68        self.inner = self.inner.with_url(url);
69        self
70    }
71
72    /// Set configs to use for the client. The provided configs will override any found in the default config files loaded
73    pub fn with_config(
74        mut self,
75        config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
76    ) -> Self {
77        self.inner = self.inner.with_config(config);
78        self
79    }
80
81    // Use a dedicated tokio runtime for spawned tasks and IO operations
82    pub fn with_io_runtime(mut self, runtime: Handle) -> Self {
83        self.inner = self.inner.with_io_runtime(runtime);
84        self
85    }
86
87    /// Create the [HdfsObjectStore]] instance from the provided settings
88    pub fn build(self) -> Result<HdfsObjectStore> {
89        let client = self.inner.build().to_object_store_err()?;
90
91        Ok(HdfsObjectStore { client })
92    }
93}
94
95/// Interface for [Hadoop Distributed File System](https://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html).
96#[derive(Debug, Clone)]
97pub struct HdfsObjectStore {
98    client: Client,
99}
100
101impl HdfsObjectStore {
102    /// Creates a new HdfsObjectStore from an existing `hdfs-native` [Client]
103    ///
104    /// ```rust
105    /// # use std::sync::Arc;
106    /// use hdfs_native::ClientBuilder;
107    /// # use hdfs_native_object_store::HdfsObjectStore;
108    /// let client = ClientBuilder::new().with_url("hdfs://127.0.0.1:9000").build().unwrap();
109    /// let store = HdfsObjectStore::new(client);
110    /// ```
111    pub fn new(client: Client) -> Self {
112        Self { client }
113    }
114
115    /// Creates a new HdfsObjectStore using the specified URL
116    ///
117    /// Connect to a NameNode
118    /// ```rust
119    /// # use hdfs_native_object_store::HdfsObjectStore;
120    /// # fn main() -> object_store::Result<()> {
121    /// let store = HdfsObjectStore::with_url("hdfs://127.0.0.1:9000")?;
122    /// # Ok(())
123    /// # }
124    /// ```
125    #[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
126    pub fn with_url(url: &str) -> Result<Self> {
127        let client = ClientBuilder::new()
128            .with_url(url)
129            .build()
130            .to_object_store_err()?;
131
132        Ok(Self { client })
133    }
134
135    /// Creates a new HdfsObjectStore using the specified URL and Hadoop configs.
136    ///
137    /// Connect to a NameService
138    /// ```rust
139    /// # use hdfs_native_object_store::HdfsObjectStore;
140    /// # use std::collections::HashMap;
141    /// # fn main() -> object_store::Result<()> {
142    /// let config = HashMap::from([
143    ///     ("dfs.ha.namenodes.ns".to_string(), "nn1,nn2".to_string()),
144    ///     ("dfs.namenode.rpc-address.ns.nn1".to_string(), "nn1.example.com:9000".to_string()),
145    ///     ("dfs.namenode.rpc-address.ns.nn2".to_string(), "nn2.example.com:9000".to_string()),
146    /// ]);
147    /// let store = HdfsObjectStore::with_config("hdfs://ns", config)?;
148    /// # Ok(())
149    /// # }
150    /// ```
151    #[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
152    pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
153        let client = ClientBuilder::new()
154            .with_url(url)
155            .with_config(config)
156            .build()
157            .to_object_store_err()?;
158
159        Ok(Self { client })
160    }
161
162    async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
163        let path_buf = PathBuf::from(file_path);
164
165        let file_name = path_buf
166            .file_name()
167            .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
168            .to_object_store_err()?
169            .to_str()
170            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
171            .to_object_store_err()?
172            .to_string();
173
174        let tmp_file_path = path_buf
175            .with_file_name(format!(".{file_name}.tmp"))
176            .to_str()
177            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
178            .to_object_store_err()?
179            .to_string();
180
181        // Try to create a file with an incrementing index until we find one that doesn't exist yet
182        let mut index = 1;
183        loop {
184            let path = format!("{tmp_file_path}.{index}");
185            match self.client.create(&path, WriteOptions::default()).await {
186                Ok(writer) => break Ok((writer, path)),
187                Err(HdfsError::AlreadyExists(_)) => index += 1,
188                Err(e) => break Err(e).to_object_store_err(),
189            }
190        }
191    }
192}
193
194impl Display for HdfsObjectStore {
195    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
196        write!(f, "HdfsObjectStore")
197    }
198}
199
200impl From<Client> for HdfsObjectStore {
201    fn from(value: Client) -> Self {
202        Self { client: value }
203    }
204}
205
206#[async_trait]
207impl ObjectStore for HdfsObjectStore {
208    /// Save the provided bytes to the specified location
209    ///
210    /// To make the operation atomic, we write to a temporary file `.{filename}.tmp.{i}` and rename
211    /// on a successful write, where `i` is an integer that is incremented until a non-existent file
212    /// is found.
213    async fn put_opts(
214        &self,
215        location: &Path,
216        payload: PutPayload,
217        opts: PutOptions,
218    ) -> Result<PutResult> {
219        let overwrite = match opts.mode {
220            PutMode::Create => false,
221            PutMode::Overwrite => true,
222            PutMode::Update(_) => {
223                return Err(object_store::Error::NotImplemented {
224                    operation: "PutOptions with Update precondition".to_string(),
225                    implementer: "HdfsObjectStore".to_string(),
226                });
227            }
228        };
229
230        let final_file_path = make_absolute_file(location);
231
232        // If we're not overwriting, do an upfront check to see if the file already
233        // exists. Otherwise we have to write the whole file and try to rename before
234        // finding out.
235        if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
236            return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
237        }
238
239        let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
240
241        for bytes in payload {
242            tmp_file.write(bytes).await.to_object_store_err()?;
243        }
244        tmp_file.close().await.to_object_store_err()?;
245
246        self.client
247            .rename(&tmp_file_path, &final_file_path, overwrite)
248            .await
249            .to_object_store_err()?;
250
251        let e_tag = self
252            .get_opts(location, GetOptions::default().with_head(true))
253            .await?
254            .meta
255            .e_tag;
256
257        Ok(PutResult {
258            e_tag,
259            version: None,
260        })
261    }
262
263    /// Create a multipart writer that writes to a temporary file in a background task, and renames
264    /// to the final destination on complete.
265    #[allow(deprecated)]
266    async fn put_multipart_opts(
267        &self,
268        location: &Path,
269        _opts: PutMultipartOpts,
270    ) -> Result<Box<dyn MultipartUpload>> {
271        let final_file_path = make_absolute_file(location);
272
273        let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
274
275        Ok(Box::new(HdfsMultipartWriter::new(
276            self.client.clone(),
277            tmp_file,
278            &tmp_file_path,
279            &final_file_path,
280        )))
281    }
282
283    /// Reads data for the specified location.
284    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
285        let status = self
286            .client
287            .get_file_info(&make_absolute_file(location))
288            .await
289            .to_object_store_err()?;
290
291        if status.isdir {
292            return Err(object_store::Error::NotFound {
293                path: location.to_string(),
294                source: "Head cannot be called on a directory".into(),
295            });
296        }
297
298        let meta = get_object_meta(&status)?;
299
300        options.check_preconditions(&meta)?;
301
302        let (range, payload) = if options.head {
303            (
304                (0..0),
305                GetResultPayload::Stream(futures::stream::empty().boxed()),
306            )
307        } else {
308            let range = options
309                .range
310                .map(|r| r.as_range(meta.size))
311                .transpose()
312                .map_err(|source| generic_error(source.into()))?
313                .unwrap_or(0..meta.size);
314
315            let reader = self
316                .client
317                .read(&make_absolute_file(location))
318                .await
319                .to_object_store_err()?;
320            let start: usize = range
321                .start
322                .try_into()
323                .expect("unable to convert range.start to usize");
324            let end: usize = range
325                .end
326                .try_into()
327                .expect("unable to convert range.end to usize");
328            let stream = reader
329                .read_range_stream(start, end - start)
330                .map(|b| b.to_object_store_err())
331                .boxed();
332
333            (range, GetResultPayload::Stream(stream))
334        };
335
336        Ok(GetResult {
337            payload,
338            meta,
339            range,
340            attributes: Default::default(),
341        })
342    }
343
344    /// Delete a stream of objects.
345    fn delete_stream(
346        &self,
347        locations: BoxStream<'static, Result<Path>>,
348    ) -> BoxStream<'static, Result<Path>> {
349        let client = self.client.clone();
350        locations
351            .map(move |location| {
352                let client = client.clone();
353                async move {
354                    let location = location?;
355                    client
356                        .delete(&make_absolute_file(&location), false)
357                        .await
358                        .to_object_store_err()?;
359                    Ok(location)
360                }
361            })
362            .buffered(10)
363            .boxed()
364    }
365
366    /// List all the objects with the given prefix.
367    ///
368    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
369    /// `foo/bar_baz/x`.
370    ///
371    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
372    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
373        let absolute_dir = prefix.map(make_absolute_file).unwrap_or("/".to_string());
374
375        let status_stream = self
376            .client
377            .list_status_iter(&absolute_dir, true)
378            .into_stream()
379            .filter(move |res| {
380                let result = match res {
381                    // Directories aren't a thing in object stores so ignore them, and if a file is listed
382                    // directly that should be ignored as well
383                    Ok(status) => !status.isdir && status.path != absolute_dir,
384                    // Listing by prefix should just return an empty list if the prefix isn't found
385                    Err(HdfsError::FileNotFound(_)) => false,
386                    _ => true,
387                };
388                future::ready(result)
389            })
390            .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
391
392        Box::pin(status_stream)
393    }
394
395    /// List objects with the given prefix and an implementation specific
396    /// delimiter. Returns common prefixes (directories) in addition to object
397    /// metadata.
398    ///
399    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
400    /// `foo/bar_baz/x`.
401    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
402        let absolute_dir = prefix.map(make_absolute_file).unwrap_or("/".to_string());
403
404        let mut status_stream = self
405            .client
406            .list_status_iter(&absolute_dir, false)
407            .into_stream()
408            .filter(move |res| {
409                let result = match res {
410                    // If a file is listed directly it should be ignored
411                    Ok(status) => status.path != absolute_dir,
412                    // Listing by prefix should just return an empty list if the prefix isn't found
413                    Err(HdfsError::FileNotFound(_)) => false,
414                    _ => true,
415                };
416                future::ready(result)
417            });
418
419        let mut statuses = Vec::<FileStatus>::new();
420        while let Some(status) = status_stream.next().await {
421            statuses.push(status.to_object_store_err()?);
422        }
423
424        let mut dirs: Vec<Path> = Vec::new();
425        for status in statuses.iter().filter(|s| s.isdir) {
426            dirs.push(Path::parse(&status.path)?)
427        }
428
429        let mut files: Vec<ObjectMeta> = Vec::new();
430        for status in statuses.iter().filter(|s| !s.isdir) {
431            files.push(get_object_meta(status)?)
432        }
433
434        Ok(ListResult {
435            common_prefixes: dirs,
436            objects: files,
437        })
438    }
439
440    async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
441        // Make sure the parent directory exists
442        let mut parent: Vec<_> = to.parts().collect();
443        parent.pop();
444
445        if !parent.is_empty() {
446            let parent_path: Path = parent.into_iter().collect();
447            self.client
448                .mkdirs(&make_absolute_dir(&parent_path), 0o755, true)
449                .await
450                .to_object_store_err()?;
451        }
452
453        let overwrite = match options.target_mode {
454            RenameTargetMode::Overwrite => true,
455            RenameTargetMode::Create => false,
456        };
457
458        Ok(self
459            .client
460            .rename(
461                &make_absolute_file(from),
462                &make_absolute_file(to),
463                overwrite,
464            )
465            .await
466            .to_object_store_err()?)
467    }
468
469    /// Copy an object from one path to another with options.
470    async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
471        let overwrite = match options.mode {
472            CopyMode::Create => {
473                // Eagerly check if the target file exists first before wasting time copying
474                match self.client.get_file_info(&make_absolute_file(to)).await {
475                    Ok(_) => {
476                        return Err(HdfsError::AlreadyExists(make_absolute_file(to)))
477                            .to_object_store_err();
478                    }
479                    Err(HdfsError::FileNotFound(_)) => false,
480                    Err(e) => return Err(e).to_object_store_err(),
481                }
482            }
483            CopyMode::Overwrite => true,
484        };
485
486        let write_options = WriteOptions {
487            overwrite,
488            ..Default::default()
489        };
490
491        let file = self
492            .client
493            .read(&make_absolute_file(from))
494            .await
495            .to_object_store_err()?;
496        let mut stream = file.read_range_stream(0, file.file_length()).boxed();
497
498        let mut new_file = self
499            .client
500            .create(&make_absolute_file(to), write_options)
501            .await
502            .to_object_store_err()?;
503
504        while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
505            new_file.write(bytes).await.to_object_store_err()?;
506        }
507        new_file.close().await.to_object_store_err()?;
508
509        Ok(())
510    }
511}
512
513trait HdfsErrorConvert<T> {
514    fn to_object_store_err(self) -> Result<T>;
515}
516
517impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
518    fn to_object_store_err(self) -> Result<T> {
519        self.map_err(|err| match err {
520            HdfsError::FileNotFound(path) => object_store::Error::NotFound {
521                path: path.clone(),
522                source: Box::new(HdfsError::FileNotFound(path)),
523            },
524            HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
525                path: path.clone(),
526                source: Box::new(HdfsError::AlreadyExists(path)),
527            },
528            _ => object_store::Error::Generic {
529                store: "HdfsObjectStore",
530                source: Box::new(err),
531            },
532        })
533    }
534}
535
536type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
537
538// Create a fake multipart writer the creates an uploader to a temp file as a background
539// task, and submits new parts to be uploaded to a queue for this task.
540// A once cell is used to track whether a part has finished writing or not.
541// On completing, rename the file to the actual target.
542struct HdfsMultipartWriter {
543    client: Client,
544    sender: Option<(JoinHandle<Result<()>>, PartSender)>,
545    tmp_filename: String,
546    final_filename: String,
547}
548
549impl HdfsMultipartWriter {
550    fn new(client: Client, writer: FileWriter, tmp_filename: &str, final_filename: &str) -> Self {
551        let (sender, receiver) = mpsc::unbounded_channel();
552
553        Self {
554            client,
555            sender: Some((Self::start_writer_task(writer, receiver), sender)),
556            tmp_filename: tmp_filename.to_string(),
557            final_filename: final_filename.to_string(),
558        }
559    }
560
561    fn start_writer_task(
562        mut writer: FileWriter,
563        mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
564    ) -> JoinHandle<Result<()>> {
565        task::spawn(async move {
566            loop {
567                match part_receiver.recv().await {
568                    Some((sender, part)) => {
569                        for bytes in part {
570                            if let Err(e) = writer.write(bytes).await {
571                                let _ = sender.send(Err(e).to_object_store_err());
572                                return Err(generic_error("Failed to write all parts".into()));
573                            }
574                        }
575                        let _ = sender.send(Ok(()));
576                    }
577                    None => return writer.close().await.to_object_store_err(),
578                }
579            }
580        })
581    }
582}
583
584impl std::fmt::Debug for HdfsMultipartWriter {
585    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
586        f.debug_struct("HdfsMultipartWriter")
587            .field("tmp_filename", &self.tmp_filename)
588            .field("final_filename", &self.final_filename)
589            .finish()
590    }
591}
592
593#[async_trait]
594impl MultipartUpload for HdfsMultipartWriter {
595    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
596        let (result_sender, result_receiver) = oneshot::channel();
597
598        if let Some((_, payload_sender)) = self.sender.as_ref() {
599            if let Err(mpsc::error::SendError((result_sender, _))) =
600                payload_sender.send((result_sender, payload))
601            {
602                let _ = result_sender.send(Err(generic_error("Write task failed".into())));
603            }
604        } else {
605            let _ = result_sender.send(Err(generic_error(
606                "Cannot put part after completing or aborting".into(),
607            )));
608        }
609
610        async {
611            result_receiver
612                .await
613                .unwrap_or_else(|_| Err(generic_error("Write task failed".into())))
614        }
615        .boxed()
616    }
617
618    async fn complete(&mut self) -> Result<PutResult> {
619        // Drop the sender so the task knows no more data is coming
620        if let Some((handle, sender)) = self.sender.take() {
621            drop(sender);
622
623            // Wait for the writer task to finish
624            handle.await??;
625
626            self.client
627                .rename(&self.tmp_filename, &self.final_filename, true)
628                .await
629                .to_object_store_err()?;
630
631            Ok(PutResult {
632                e_tag: None,
633                version: None,
634            })
635        } else {
636            Err(generic_error(
637                "Cannot call abort or complete multiple times".into(),
638            ))
639        }
640    }
641
642    async fn abort(&mut self) -> Result<()> {
643        // Drop the sender so the task knows no more data is coming
644        if let Some((handle, sender)) = self.sender.take() {
645            drop(sender);
646
647            // Wait for the writer task to finish
648            handle.abort();
649
650            self.client
651                .delete(&self.tmp_filename, false)
652                .await
653                .to_object_store_err()?;
654
655            Ok(())
656        } else {
657            Err(generic_error(
658                "Cannot call abort or complete multiple times".into(),
659            ))
660        }
661    }
662}
663
664/// ObjectStore paths always remove the leading slash, so add it back
665fn make_absolute_file(path: &Path) -> String {
666    format!("/{}", path.as_ref())
667}
668
669fn make_absolute_dir(path: &Path) -> String {
670    if path.parts().count() > 0 {
671        format!("/{}/", path.as_ref())
672    } else {
673        "/".to_string()
674    }
675}
676
677fn get_etag(status: &FileStatus) -> String {
678    let size = status.length;
679    let mtime = status.modification_time;
680
681    // Use an ETag scheme based on that used by many popular HTTP servers
682    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
683    // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
684    format!("{mtime:x}-{size:x}")
685}
686
687fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
688    Ok(ObjectMeta {
689        location: Path::parse(&status.path)?,
690        last_modified: DateTime::<Utc>::from_timestamp_millis(status.modification_time as i64)
691            .ok_or(generic_error(
692                "Last modified timestamp out of bounds".into(),
693            ))?,
694        size: status.length as u64,
695        e_tag: Some(get_etag(status)),
696        version: None,
697    })
698}
699
700#[cfg(test)]
701#[cfg(feature = "integration-test")]
702mod test {
703    use std::collections::HashSet;
704
705    use object_store::integration::*;
706    use serial_test::serial;
707    use tokio::runtime::Runtime;
708
709    use crate::HdfsObjectStoreBuilder;
710
711    #[tokio::test]
712    #[serial]
713    async fn hdfs_test() {
714        let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
715            hdfs_native::minidfs::DfsFeatures::HA,
716        ]));
717
718        let integration = HdfsObjectStoreBuilder::new()
719            .with_url(&dfs.url)
720            .build()
721            .unwrap();
722
723        put_get_delete_list(&integration).await;
724        list_uses_directories_correctly(&integration).await;
725        list_with_delimiter(&integration).await;
726        rename_and_copy(&integration).await;
727        copy_if_not_exists(&integration).await;
728        multipart_race_condition(&integration, true).await;
729        multipart_out_of_order(&integration).await;
730        get_opts(&integration).await;
731        put_opts(&integration, false).await;
732    }
733
734    #[test]
735    #[serial]
736    fn test_no_tokio() {
737        let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
738            hdfs_native::minidfs::DfsFeatures::HA,
739        ]));
740
741        let integration = HdfsObjectStoreBuilder::new()
742            .with_url(&dfs.url)
743            .build()
744            .unwrap();
745
746        futures::executor::block_on(get_opts(&integration));
747
748        let rt = Runtime::new().unwrap();
749
750        let integration = HdfsObjectStoreBuilder::new()
751            .with_url(&dfs.url)
752            .with_io_runtime(rt.handle().clone())
753            .build()
754            .unwrap();
755
756        futures::executor::block_on(get_opts(&integration));
757    }
758}