hdfs_native_object_store/
lib.rs

1// #![warn(missing_docs)]
2//! [object_store::ObjectStore] implementation for the Native Rust HDFS client
3//!
4//! # Usage
5//!
6//! ```rust
7//! use hdfs_native_object_store::HdfsObjectStore;
8//! # use object_store::Result;
9//! # fn main() -> Result<()> {
10//! let store = HdfsObjectStore::with_url("hdfs://localhost:9000")?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::{
16    collections::HashMap,
17    fmt::{Display, Formatter},
18    future,
19    path::PathBuf,
20    sync::Arc,
21};
22
23use async_trait::async_trait;
24use chrono::{DateTime, Utc};
25use futures::{
26    stream::{BoxStream, StreamExt},
27    FutureExt,
28};
29use hdfs_native::{client::FileStatus, file::FileWriter, Client, HdfsError, WriteOptions};
30use object_store::{
31    path::Path, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
32    ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
33    UploadPart,
34};
35use tokio::{
36    sync::{mpsc, oneshot},
37    task::{self, JoinHandle},
38};
39
40// Re-export minidfs for down-stream integration tests
41#[cfg(feature = "integration-test")]
42pub use hdfs_native::minidfs;
43
44#[derive(Debug)]
45pub struct HdfsObjectStore {
46    client: Arc<Client>,
47}
48
49impl HdfsObjectStore {
50    /// Creates a new HdfsObjectStore from an existing [Client]
51    ///
52    /// ```rust
53    /// # use std::sync::Arc;
54    /// use hdfs_native::Client;
55    /// # use hdfs_native_object_store::HdfsObjectStore;
56    /// let client = Client::new("hdfs://127.0.0.1:9000").unwrap();
57    /// let store = HdfsObjectStore::new(Arc::new(client));
58    /// ```
59    pub fn new(client: Arc<Client>) -> Self {
60        Self { client }
61    }
62
63    /// Creates a new HdfsObjectStore using the specified URL
64    ///
65    /// Connect to a NameNode
66    /// ```rust
67    /// # use hdfs_native_object_store::HdfsObjectStore;
68    /// # fn main() -> object_store::Result<()> {
69    /// let store = HdfsObjectStore::with_url("hdfs://127.0.0.1:9000")?;
70    /// # Ok(())
71    /// # }
72    /// ```
73    pub fn with_url(url: &str) -> Result<Self> {
74        Ok(Self::new(Arc::new(Client::new(url).to_object_store_err()?)))
75    }
76
77    /// Creates a new HdfsObjectStore using the specified URL and Hadoop configs.
78    ///
79    /// Connect to a NameService
80    /// ```rust
81    /// # use hdfs_native_object_store::HdfsObjectStore;
82    /// # use std::collections::HashMap;
83    /// # fn main() -> object_store::Result<()> {
84    /// let config = HashMap::from([
85    ///     ("dfs.ha.namenodes.ns".to_string(), "nn1,nn2".to_string()),
86    ///     ("dfs.namenode.rpc-address.ns.nn1".to_string(), "nn1.example.com:9000".to_string()),
87    ///     ("dfs.namenode.rpc-address.ns.nn2".to_string(), "nn2.example.com:9000".to_string()),
88    /// ]);
89    /// let store = HdfsObjectStore::with_config("hdfs://ns", config)?;
90    /// # Ok(())
91    /// # }
92    /// ```
93    pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
94        Ok(Self::new(Arc::new(
95            Client::new_with_config(url, config).to_object_store_err()?,
96        )))
97    }
98
99    async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
100        let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await {
101            Ok(_) if overwrite => true,
102            Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to))).to_object_store_err()?,
103            Err(HdfsError::FileNotFound(_)) => false,
104            Err(e) => Err(e).to_object_store_err()?,
105        };
106
107        let write_options = WriteOptions {
108            overwrite,
109            ..Default::default()
110        };
111
112        let file = self
113            .client
114            .read(&make_absolute_file(from))
115            .await
116            .to_object_store_err()?;
117        let mut stream = file.read_range_stream(0, file.file_length()).boxed();
118
119        let mut new_file = self
120            .client
121            .create(&make_absolute_file(to), write_options)
122            .await
123            .to_object_store_err()?;
124
125        while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
126            new_file.write(bytes).await.to_object_store_err()?;
127        }
128        new_file.close().await.to_object_store_err()?;
129
130        Ok(())
131    }
132
133    async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
134        let path_buf = PathBuf::from(file_path);
135
136        let file_name = path_buf
137            .file_name()
138            .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
139            .to_object_store_err()?
140            .to_str()
141            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
142            .to_object_store_err()?
143            .to_string();
144
145        let tmp_file_path = path_buf
146            .with_file_name(format!(".{}.tmp", file_name))
147            .to_str()
148            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
149            .to_object_store_err()?
150            .to_string();
151
152        // Try to create a file with an incrementing index until we find one that doesn't exist yet
153        let mut index = 1;
154        loop {
155            let path = format!("{}.{}", tmp_file_path, index);
156            match self.client.create(&path, WriteOptions::default()).await {
157                Ok(writer) => break Ok((writer, path)),
158                Err(HdfsError::AlreadyExists(_)) => index += 1,
159                Err(e) => break Err(e).to_object_store_err(),
160            }
161        }
162    }
163}
164
165impl Display for HdfsObjectStore {
166    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167        write!(f, "HdfsObjectStore")
168    }
169}
170
171impl From<Client> for HdfsObjectStore {
172    fn from(value: Client) -> Self {
173        Self::new(Arc::new(value))
174    }
175}
176
177#[async_trait]
178impl ObjectStore for HdfsObjectStore {
179    /// Save the provided bytes to the specified location
180    ///
181    /// To make the operation atomic, we write to a temporary file `.{filename}.tmp.{i}` and rename
182    /// on a successful write, where `i` is an integer that is incremented until a non-existent file
183    /// is found.
184    async fn put_opts(
185        &self,
186        location: &Path,
187        payload: PutPayload,
188        opts: PutOptions,
189    ) -> Result<PutResult> {
190        let overwrite = match opts.mode {
191            PutMode::Create => false,
192            PutMode::Overwrite => true,
193            PutMode::Update(_) => {
194                return Err(object_store::Error::NotSupported {
195                    source: "Update mode not supported".to_string().into(),
196                })
197            }
198        };
199
200        let final_file_path = make_absolute_file(location);
201
202        // If we're not overwriting, do an upfront check to see if the file already
203        // exists. Otherwise we have to write the whole file and try to rename before
204        // finding out.
205        if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
206            return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
207        }
208
209        let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
210
211        for bytes in payload {
212            tmp_file.write(bytes).await.to_object_store_err()?;
213        }
214        tmp_file.close().await.to_object_store_err()?;
215
216        self.client
217            .rename(&tmp_file_path, &final_file_path, overwrite)
218            .await
219            .to_object_store_err()?;
220
221        Ok(PutResult {
222            e_tag: None,
223            version: None,
224        })
225    }
226
227    /// Create a multipart writer that writes to a temporary file in a background task, and renames
228    /// to the final destination on complete.
229    async fn put_multipart_opts(
230        &self,
231        location: &Path,
232        _opts: PutMultipartOpts,
233    ) -> Result<Box<dyn MultipartUpload>> {
234        let final_file_path = make_absolute_file(location);
235
236        let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
237
238        Ok(Box::new(HdfsMultipartWriter::new(
239            Arc::clone(&self.client),
240            tmp_file,
241            &tmp_file_path,
242            &final_file_path,
243        )))
244    }
245
246    /// Reads data for the specified location.
247    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
248        if options.if_match.is_some()
249            || options.if_none_match.is_some()
250            || options.if_modified_since.is_some()
251            || options.if_unmodified_since.is_some()
252        {
253            return Err(object_store::Error::NotImplemented);
254        }
255
256        let meta = self.head(location).await?;
257
258        let range = options
259            .range
260            .map(|r| match r {
261                GetRange::Bounded(range) => range,
262                GetRange::Offset(offset) => offset..meta.size,
263                GetRange::Suffix(suffix) => meta.size.saturating_sub(suffix)..meta.size,
264            })
265            .unwrap_or(0..meta.size);
266
267        let reader = self
268            .client
269            .read(&make_absolute_file(location))
270            .await
271            .to_object_store_err()?;
272        let start: usize = range
273            .start
274            .try_into()
275            .expect("unable to convert range.start to usize");
276        let end: usize = range
277            .end
278            .try_into()
279            .expect("unable to convert range.end to usize");
280        let stream = reader
281            .read_range_stream(start, end - start)
282            .map(|b| b.to_object_store_err())
283            .boxed();
284
285        let payload = GetResultPayload::Stream(stream);
286
287        Ok(GetResult {
288            payload,
289            meta,
290            range,
291            attributes: Default::default(),
292        })
293    }
294
295    /// Return the metadata for the specified location
296    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
297        let status = self
298            .client
299            .get_file_info(&make_absolute_file(location))
300            .await
301            .to_object_store_err()?;
302
303        if status.isdir {
304            return Err(HdfsError::IsADirectoryError(
305                "Head must be called on a file".to_string(),
306            ))
307            .to_object_store_err();
308        }
309
310        get_object_meta(&status)
311    }
312
313    /// Delete the object at the specified location.
314    async fn delete(&self, location: &Path) -> Result<()> {
315        let result = self
316            .client
317            .delete(&make_absolute_file(location), false)
318            .await
319            .to_object_store_err()?;
320
321        if !result {
322            Err(HdfsError::OperationFailed(
323                "failed to delete object".to_string(),
324            ))
325            .to_object_store_err()?
326        }
327
328        Ok(())
329    }
330
331    /// List all the objects with the given prefix.
332    ///
333    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
334    /// `foo/bar_baz/x`.
335    ///
336    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
337    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
338        let status_stream = self
339            .client
340            .list_status_iter(
341                &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
342                true,
343            )
344            .into_stream()
345            .filter(|res| {
346                let result = match res {
347                    Ok(status) => !status.isdir,
348                    // Listing by prefix should just return an empty list if the prefix isn't found
349                    Err(HdfsError::FileNotFound(_)) => false,
350                    _ => true,
351                };
352                future::ready(result)
353            })
354            .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
355
356        Box::pin(status_stream)
357    }
358
359    /// List objects with the given prefix and an implementation specific
360    /// delimiter. Returns common prefixes (directories) in addition to object
361    /// metadata.
362    ///
363    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
364    /// `foo/bar_baz/x`.
365    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
366        let mut status_stream = self
367            .client
368            .list_status_iter(
369                &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
370                false,
371            )
372            .into_stream()
373            .filter(|res| {
374                let result = match res {
375                    // Listing by prefix should just return an empty list if the prefix isn't found
376                    Err(HdfsError::FileNotFound(_)) => false,
377                    _ => true,
378                };
379                future::ready(result)
380            });
381
382        let mut statuses = Vec::<FileStatus>::new();
383        while let Some(status) = status_stream.next().await {
384            statuses.push(status.to_object_store_err()?);
385        }
386
387        let mut dirs: Vec<Path> = Vec::new();
388        for status in statuses.iter().filter(|s| s.isdir) {
389            dirs.push(Path::parse(&status.path)?)
390        }
391
392        let mut files: Vec<ObjectMeta> = Vec::new();
393        for status in statuses.iter().filter(|s| !s.isdir) {
394            files.push(get_object_meta(status)?)
395        }
396
397        Ok(ListResult {
398            common_prefixes: dirs,
399            objects: files,
400        })
401    }
402
403    /// Renames a file. This operation is guaranteed to be atomic.
404    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
405        Ok(self
406            .client
407            .rename(&make_absolute_file(from), &make_absolute_file(to), true)
408            .await
409            .to_object_store_err()?)
410    }
411
412    /// Renames a file only if the distination doesn't exist. This operation is guaranteed
413    /// to be atomic.
414    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
415        Ok(self
416            .client
417            .rename(&make_absolute_file(from), &make_absolute_file(to), false)
418            .await
419            .to_object_store_err()?)
420    }
421
422    /// Copy an object from one path to another in the same object store.
423    ///
424    /// If there exists an object at the destination, it will be overwritten.
425    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
426        self.internal_copy(from, to, true).await
427    }
428
429    /// Copy an object from one path to another, only if destination is empty.
430    ///
431    /// Will return an error if the destination already has an object.
432    ///
433    /// Performs an atomic operation if the underlying object storage supports it.
434    /// If atomic operations are not supported by the underlying object storage (like S3)
435    /// it will return an error.
436    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
437        self.internal_copy(from, to, false).await
438    }
439}
440
441#[cfg(feature = "integration-test")]
442pub trait HdfsErrorConvert<T> {
443    fn to_object_store_err(self) -> Result<T>;
444}
445
446#[cfg(not(feature = "integration-test"))]
447trait HdfsErrorConvert<T> {
448    fn to_object_store_err(self) -> Result<T>;
449}
450
451impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
452    fn to_object_store_err(self) -> Result<T> {
453        self.map_err(|err| match err {
454            HdfsError::FileNotFound(path) => object_store::Error::NotFound {
455                path: path.clone(),
456                source: Box::new(HdfsError::FileNotFound(path)),
457            },
458            HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
459                path: path.clone(),
460                source: Box::new(HdfsError::AlreadyExists(path)),
461            },
462            _ => object_store::Error::Generic {
463                store: "HdfsObjectStore",
464                source: Box::new(err),
465            },
466        })
467    }
468}
469
470type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
471
472// Create a fake multipart writer the creates an uploader to a temp file as a background
473// task, and submits new parts to be uploaded to a queue for this task.
474// A once cell is used to track whether a part has finished writing or not.
475// On completing, rename the file to the actual target.
476struct HdfsMultipartWriter {
477    client: Arc<Client>,
478    sender: Option<(JoinHandle<Result<()>>, PartSender)>,
479    tmp_filename: String,
480    final_filename: String,
481}
482
483impl HdfsMultipartWriter {
484    fn new(
485        client: Arc<Client>,
486        writer: FileWriter,
487        tmp_filename: &str,
488        final_filename: &str,
489    ) -> Self {
490        let (sender, receiver) = mpsc::unbounded_channel();
491
492        Self {
493            client,
494            sender: Some((Self::start_writer_task(writer, receiver), sender)),
495            tmp_filename: tmp_filename.to_string(),
496            final_filename: final_filename.to_string(),
497        }
498    }
499
500    fn start_writer_task(
501        mut writer: FileWriter,
502        mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
503    ) -> JoinHandle<Result<()>> {
504        task::spawn(async move {
505            'outer: loop {
506                match part_receiver.recv().await {
507                    Some((sender, part)) => {
508                        for bytes in part {
509                            if let Err(e) = writer.write(bytes).await.to_object_store_err() {
510                                let _ = sender.send(Err(e));
511                                break 'outer;
512                            }
513                        }
514                        let _ = sender.send(Ok(()));
515                    }
516                    None => {
517                        return writer.close().await.to_object_store_err();
518                    }
519                }
520            }
521
522            // If we've reached here, a write task failed so just return Err's for all new parts that come in
523            while let Some((sender, _)) = part_receiver.recv().await {
524                let _ = sender.send(
525                    Err(HdfsError::OperationFailed(
526                        "Write failed during one of the parts".to_string(),
527                    ))
528                    .to_object_store_err(),
529                );
530            }
531            Err(HdfsError::OperationFailed(
532                "Write failed during one of the parts".to_string(),
533            ))
534            .to_object_store_err()
535        })
536    }
537}
538
539impl std::fmt::Debug for HdfsMultipartWriter {
540    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
541        f.debug_struct("HdfsMultipartWriter")
542            .field("tmp_filename", &self.tmp_filename)
543            .field("final_filename", &self.final_filename)
544            .finish()
545    }
546}
547
548#[async_trait]
549impl MultipartUpload for HdfsMultipartWriter {
550    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
551        let (result_sender, result_receiver) = oneshot::channel();
552
553        if let Some((_, payload_sender)) = self.sender.as_ref() {
554            payload_sender.send((result_sender, payload)).unwrap();
555        } else {
556            result_sender
557                .send(
558                    Err(HdfsError::OperationFailed(
559                        "Cannot put part after completing or aborting".to_string(),
560                    ))
561                    .to_object_store_err(),
562                )
563                .unwrap();
564        }
565
566        async { result_receiver.await.unwrap() }.boxed()
567    }
568
569    async fn complete(&mut self) -> Result<PutResult> {
570        // Drop the sender so the task knows no more data is coming
571        if let Some((handle, sender)) = self.sender.take() {
572            drop(sender);
573
574            // Wait for the writer task to finish
575            handle.await??;
576
577            self.client
578                .rename(&self.tmp_filename, &self.final_filename, true)
579                .await
580                .to_object_store_err()?;
581
582            Ok(PutResult {
583                e_tag: None,
584                version: None,
585            })
586        } else {
587            Err(object_store::Error::NotSupported {
588                source: "Cannot call abort or complete multiple times".into(),
589            })
590        }
591    }
592
593    async fn abort(&mut self) -> Result<()> {
594        // Drop the sender so the task knows no more data is coming
595        if let Some((handle, sender)) = self.sender.take() {
596            drop(sender);
597
598            // Wait for the writer task to finish
599            handle.abort();
600
601            self.client
602                .delete(&self.tmp_filename, false)
603                .await
604                .to_object_store_err()?;
605
606            Ok(())
607        } else {
608            Err(object_store::Error::NotSupported {
609                source: "Cannot call abort or complete multiple times".into(),
610            })
611        }
612    }
613}
614
615/// ObjectStore paths always remove the leading slash, so add it back
616fn make_absolute_file(path: &Path) -> String {
617    format!("/{}", path.as_ref())
618}
619
620fn make_absolute_dir(path: &Path) -> String {
621    if path.parts().count() > 0 {
622        format!("/{}/", path.as_ref())
623    } else {
624        "/".to_string()
625    }
626}
627
628fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
629    Ok(ObjectMeta {
630        location: Path::parse(&status.path)?,
631        last_modified: DateTime::<Utc>::from_timestamp_millis(status.modification_time as i64)
632            .unwrap(),
633        size: status
634            .length
635            .try_into()
636            .expect("unable to convert status.length to usize"),
637        e_tag: None,
638        version: None,
639    })
640}