hdfs_native_object_store/
lib.rs

1// #![warn(missing_docs)]
2//! [object_store::ObjectStore] implementation for the Native Rust HDFS client
3//!
4//! # Usage
5//!
6//! ```rust
7//! use hdfs_native_object_store::HdfsObjectStore;
8//! # use object_store::Result;
9//! # fn main() -> Result<()> {
10//! let store = HdfsObjectStore::with_url("hdfs://localhost:9000")?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::{
16    collections::HashMap,
17    fmt::{Display, Formatter},
18    future,
19    path::PathBuf,
20    sync::Arc,
21};
22
23use async_trait::async_trait;
24use chrono::{DateTime, Utc};
25use futures::{
26    stream::{BoxStream, StreamExt},
27    FutureExt,
28};
29use hdfs_native::{client::FileStatus, file::FileWriter, Client, HdfsError, WriteOptions};
30use object_store::{
31    path::Path, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
32    ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
33    UploadPart,
34};
35use tokio::{
36    sync::{mpsc, oneshot},
37    task::{self, JoinHandle},
38};
39
40// Re-export minidfs for down-stream integration tests
41#[cfg(feature = "integration-test")]
42pub use hdfs_native::minidfs;
43
44#[derive(Debug)]
45pub struct HdfsObjectStore {
46    client: Arc<Client>,
47}
48
49impl HdfsObjectStore {
50    /// Creates a new HdfsObjectStore from an existing [Client]
51    ///
52    /// ```rust
53    /// # use std::sync::Arc;
54    /// use hdfs_native::Client;
55    /// # use hdfs_native_object_store::HdfsObjectStore;
56    /// let client = Client::new("hdfs://127.0.0.1:9000").unwrap();
57    /// let store = HdfsObjectStore::new(Arc::new(client));
58    /// ```
59    pub fn new(client: Arc<Client>) -> Self {
60        Self { client }
61    }
62
63    /// Creates a new HdfsObjectStore using the specified URL
64    ///
65    /// Connect to a NameNode
66    /// ```rust
67    /// # use hdfs_native_object_store::HdfsObjectStore;
68    /// # fn main() -> object_store::Result<()> {
69    /// let store = HdfsObjectStore::with_url("hdfs://127.0.0.1:9000")?;
70    /// # Ok(())
71    /// # }
72    /// ```
73    pub fn with_url(url: &str) -> Result<Self> {
74        Ok(Self::new(Arc::new(Client::new(url).to_object_store_err()?)))
75    }
76
77    /// Creates a new HdfsObjectStore using the specified URL and Hadoop configs.
78    ///
79    /// Connect to a NameService
80    /// ```rust
81    /// # use hdfs_native_object_store::HdfsObjectStore;
82    /// # use std::collections::HashMap;
83    /// # fn main() -> object_store::Result<()> {
84    /// let config = HashMap::from([
85    ///     ("dfs.ha.namenodes.ns".to_string(), "nn1,nn2".to_string()),
86    ///     ("dfs.namenode.rpc-address.ns.nn1".to_string(), "nn1.example.com:9000".to_string()),
87    ///     ("dfs.namenode.rpc-address.ns.nn2".to_string(), "nn2.example.com:9000".to_string()),
88    /// ]);
89    /// let store = HdfsObjectStore::with_config("hdfs://ns", config)?;
90    /// # Ok(())
91    /// # }
92    /// ```
93    pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
94        Ok(Self::new(Arc::new(
95            Client::new_with_config(url, config).to_object_store_err()?,
96        )))
97    }
98
99    async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
100        let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await {
101            Ok(_) if overwrite => true,
102            Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to))).to_object_store_err()?,
103            Err(HdfsError::FileNotFound(_)) => false,
104            Err(e) => Err(e).to_object_store_err()?,
105        };
106
107        let write_options = WriteOptions {
108            overwrite,
109            ..Default::default()
110        };
111
112        let file = self
113            .client
114            .read(&make_absolute_file(from))
115            .await
116            .to_object_store_err()?;
117        let mut stream = file.read_range_stream(0, file.file_length()).boxed();
118
119        let mut new_file = self
120            .client
121            .create(&make_absolute_file(to), write_options)
122            .await
123            .to_object_store_err()?;
124
125        while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
126            new_file.write(bytes).await.to_object_store_err()?;
127        }
128        new_file.close().await.to_object_store_err()?;
129
130        Ok(())
131    }
132
133    async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
134        let path_buf = PathBuf::from(file_path);
135
136        let file_name = path_buf
137            .file_name()
138            .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
139            .to_object_store_err()?
140            .to_str()
141            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
142            .to_object_store_err()?
143            .to_string();
144
145        let tmp_file_path = path_buf
146            .with_file_name(format!(".{}.tmp", file_name))
147            .to_str()
148            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
149            .to_object_store_err()?
150            .to_string();
151
152        // Try to create a file with an incrementing index until we find one that doesn't exist yet
153        let mut index = 1;
154        loop {
155            let path = format!("{}.{}", tmp_file_path, index);
156            match self.client.create(&path, WriteOptions::default()).await {
157                Ok(writer) => break Ok((writer, path)),
158                Err(HdfsError::AlreadyExists(_)) => index += 1,
159                Err(e) => break Err(e).to_object_store_err(),
160            }
161        }
162    }
163}
164
165impl Display for HdfsObjectStore {
166    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167        write!(f, "HdfsObjectStore")
168    }
169}
170
171impl From<Client> for HdfsObjectStore {
172    fn from(value: Client) -> Self {
173        Self::new(Arc::new(value))
174    }
175}
176
177#[async_trait]
178impl ObjectStore for HdfsObjectStore {
179    /// Save the provided bytes to the specified location
180    ///
181    /// To make the operation atomic, we write to a temporary file `.{filename}.tmp.{i}` and rename
182    /// on a successful write, where `i` is an integer that is incremented until a non-existent file
183    /// is found.
184    async fn put_opts(
185        &self,
186        location: &Path,
187        payload: PutPayload,
188        opts: PutOptions,
189    ) -> Result<PutResult> {
190        let overwrite = match opts.mode {
191            PutMode::Create => false,
192            PutMode::Overwrite => true,
193            PutMode::Update(_) => {
194                return Err(object_store::Error::NotSupported {
195                    source: "Update mode not supported".to_string().into(),
196                })
197            }
198        };
199
200        let final_file_path = make_absolute_file(location);
201
202        // If we're not overwriting, do an upfront check to see if the file already
203        // exists. Otherwise we have to write the whole file and try to rename before
204        // finding out.
205        if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
206            return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
207        }
208
209        let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
210
211        for bytes in payload {
212            tmp_file.write(bytes).await.to_object_store_err()?;
213        }
214        tmp_file.close().await.to_object_store_err()?;
215
216        self.client
217            .rename(&tmp_file_path, &final_file_path, overwrite)
218            .await
219            .to_object_store_err()?;
220
221        Ok(PutResult {
222            e_tag: None,
223            version: None,
224        })
225    }
226
227    /// Create a multipart writer that writes to a temporary file in a background task, and renames
228    /// to the final destination on complete.
229    async fn put_multipart_opts(
230        &self,
231        location: &Path,
232        _opts: PutMultipartOpts,
233    ) -> Result<Box<dyn MultipartUpload>> {
234        let final_file_path = make_absolute_file(location);
235
236        let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
237
238        Ok(Box::new(HdfsMultipartWriter::new(
239            Arc::clone(&self.client),
240            tmp_file,
241            &tmp_file_path,
242            &final_file_path,
243        )))
244    }
245
246    /// Reads data for the specified location.
247    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
248        if options.if_match.is_some()
249            || options.if_none_match.is_some()
250            || options.if_modified_since.is_some()
251            || options.if_unmodified_since.is_some()
252        {
253            return Err(object_store::Error::NotImplemented);
254        }
255
256        let meta = self.head(location).await?;
257
258        let range = options
259            .range
260            .map(|r| match r {
261                GetRange::Bounded(range) => range,
262                GetRange::Offset(offset) => offset..meta.size,
263                GetRange::Suffix(suffix) => meta.size.saturating_sub(suffix)..meta.size,
264            })
265            .unwrap_or(0..meta.size);
266
267        let reader = self
268            .client
269            .read(&make_absolute_file(location))
270            .await
271            .to_object_store_err()?;
272        let stream = reader
273            .read_range_stream(range.start, range.end - range.start)
274            .map(|b| b.to_object_store_err())
275            .boxed();
276
277        let payload = GetResultPayload::Stream(stream);
278
279        Ok(GetResult {
280            payload,
281            meta,
282            range,
283            attributes: Default::default(),
284        })
285    }
286
287    /// Return the metadata for the specified location
288    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
289        let status = self
290            .client
291            .get_file_info(&make_absolute_file(location))
292            .await
293            .to_object_store_err()?;
294
295        if status.isdir {
296            return Err(HdfsError::IsADirectoryError(
297                "Head must be called on a file".to_string(),
298            ))
299            .to_object_store_err();
300        }
301
302        get_object_meta(&status)
303    }
304
305    /// Delete the object at the specified location.
306    async fn delete(&self, location: &Path) -> Result<()> {
307        let result = self
308            .client
309            .delete(&make_absolute_file(location), false)
310            .await
311            .to_object_store_err()?;
312
313        if !result {
314            Err(HdfsError::OperationFailed(
315                "failed to delete object".to_string(),
316            ))
317            .to_object_store_err()?
318        }
319
320        Ok(())
321    }
322
323    /// List all the objects with the given prefix.
324    ///
325    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
326    /// `foo/bar_baz/x`.
327    ///
328    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
329    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
330        let status_stream = self
331            .client
332            .list_status_iter(
333                &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
334                true,
335            )
336            .into_stream()
337            .filter(|res| {
338                let result = match res {
339                    Ok(status) => !status.isdir,
340                    // Listing by prefix should just return an empty list if the prefix isn't found
341                    Err(HdfsError::FileNotFound(_)) => false,
342                    _ => true,
343                };
344                future::ready(result)
345            })
346            .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
347
348        Box::pin(status_stream)
349    }
350
351    /// List objects with the given prefix and an implementation specific
352    /// delimiter. Returns common prefixes (directories) in addition to object
353    /// metadata.
354    ///
355    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
356    /// `foo/bar_baz/x`.
357    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
358        let mut status_stream = self
359            .client
360            .list_status_iter(
361                &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
362                false,
363            )
364            .into_stream()
365            .filter(|res| {
366                let result = match res {
367                    // Listing by prefix should just return an empty list if the prefix isn't found
368                    Err(HdfsError::FileNotFound(_)) => false,
369                    _ => true,
370                };
371                future::ready(result)
372            });
373
374        let mut statuses = Vec::<FileStatus>::new();
375        while let Some(status) = status_stream.next().await {
376            statuses.push(status.to_object_store_err()?);
377        }
378
379        let mut dirs: Vec<Path> = Vec::new();
380        for status in statuses.iter().filter(|s| s.isdir) {
381            dirs.push(Path::parse(&status.path)?)
382        }
383
384        let mut files: Vec<ObjectMeta> = Vec::new();
385        for status in statuses.iter().filter(|s| !s.isdir) {
386            files.push(get_object_meta(status)?)
387        }
388
389        Ok(ListResult {
390            common_prefixes: dirs,
391            objects: files,
392        })
393    }
394
395    /// Renames a file. This operation is guaranteed to be atomic.
396    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
397        Ok(self
398            .client
399            .rename(&make_absolute_file(from), &make_absolute_file(to), true)
400            .await
401            .to_object_store_err()?)
402    }
403
404    /// Renames a file only if the distination doesn't exist. This operation is guaranteed
405    /// to be atomic.
406    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
407        Ok(self
408            .client
409            .rename(&make_absolute_file(from), &make_absolute_file(to), false)
410            .await
411            .to_object_store_err()?)
412    }
413
414    /// Copy an object from one path to another in the same object store.
415    ///
416    /// If there exists an object at the destination, it will be overwritten.
417    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
418        self.internal_copy(from, to, true).await
419    }
420
421    /// Copy an object from one path to another, only if destination is empty.
422    ///
423    /// Will return an error if the destination already has an object.
424    ///
425    /// Performs an atomic operation if the underlying object storage supports it.
426    /// If atomic operations are not supported by the underlying object storage (like S3)
427    /// it will return an error.
428    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
429        self.internal_copy(from, to, false).await
430    }
431}
432
433#[cfg(feature = "integration-test")]
434pub trait HdfsErrorConvert<T> {
435    fn to_object_store_err(self) -> Result<T>;
436}
437
438#[cfg(not(feature = "integration-test"))]
439trait HdfsErrorConvert<T> {
440    fn to_object_store_err(self) -> Result<T>;
441}
442
443impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
444    fn to_object_store_err(self) -> Result<T> {
445        self.map_err(|err| match err {
446            HdfsError::FileNotFound(path) => object_store::Error::NotFound {
447                path: path.clone(),
448                source: Box::new(HdfsError::FileNotFound(path)),
449            },
450            HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
451                path: path.clone(),
452                source: Box::new(HdfsError::AlreadyExists(path)),
453            },
454            _ => object_store::Error::Generic {
455                store: "HdfsObjectStore",
456                source: Box::new(err),
457            },
458        })
459    }
460}
461
462type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
463
464// Create a fake multipart writer the creates an uploader to a temp file as a background
465// task, and submits new parts to be uploaded to a queue for this task.
466// A once cell is used to track whether a part has finished writing or not.
467// On completing, rename the file to the actual target.
468struct HdfsMultipartWriter {
469    client: Arc<Client>,
470    sender: Option<(JoinHandle<Result<()>>, PartSender)>,
471    tmp_filename: String,
472    final_filename: String,
473}
474
475impl HdfsMultipartWriter {
476    fn new(
477        client: Arc<Client>,
478        writer: FileWriter,
479        tmp_filename: &str,
480        final_filename: &str,
481    ) -> Self {
482        let (sender, receiver) = mpsc::unbounded_channel();
483
484        Self {
485            client,
486            sender: Some((Self::start_writer_task(writer, receiver), sender)),
487            tmp_filename: tmp_filename.to_string(),
488            final_filename: final_filename.to_string(),
489        }
490    }
491
492    fn start_writer_task(
493        mut writer: FileWriter,
494        mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
495    ) -> JoinHandle<Result<()>> {
496        task::spawn(async move {
497            'outer: loop {
498                match part_receiver.recv().await {
499                    Some((sender, part)) => {
500                        for bytes in part {
501                            if let Err(e) = writer.write(bytes).await.to_object_store_err() {
502                                let _ = sender.send(Err(e));
503                                break 'outer;
504                            }
505                        }
506                        let _ = sender.send(Ok(()));
507                    }
508                    None => {
509                        return writer.close().await.to_object_store_err();
510                    }
511                }
512            }
513
514            // If we've reached here, a write task failed so just return Err's for all new parts that come in
515            while let Some((sender, _)) = part_receiver.recv().await {
516                let _ = sender.send(
517                    Err(HdfsError::OperationFailed(
518                        "Write failed during one of the parts".to_string(),
519                    ))
520                    .to_object_store_err(),
521                );
522            }
523            Err(HdfsError::OperationFailed(
524                "Write failed during one of the parts".to_string(),
525            ))
526            .to_object_store_err()
527        })
528    }
529}
530
531impl std::fmt::Debug for HdfsMultipartWriter {
532    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
533        f.debug_struct("HdfsMultipartWriter")
534            .field("tmp_filename", &self.tmp_filename)
535            .field("final_filename", &self.final_filename)
536            .finish()
537    }
538}
539
540#[async_trait]
541impl MultipartUpload for HdfsMultipartWriter {
542    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
543        let (result_sender, result_receiver) = oneshot::channel();
544
545        if let Some((_, payload_sender)) = self.sender.as_ref() {
546            payload_sender.send((result_sender, payload)).unwrap();
547        } else {
548            result_sender
549                .send(
550                    Err(HdfsError::OperationFailed(
551                        "Cannot put part after completing or aborting".to_string(),
552                    ))
553                    .to_object_store_err(),
554                )
555                .unwrap();
556        }
557
558        async { result_receiver.await.unwrap() }.boxed()
559    }
560
561    async fn complete(&mut self) -> Result<PutResult> {
562        // Drop the sender so the task knows no more data is coming
563        if let Some((handle, sender)) = self.sender.take() {
564            drop(sender);
565
566            // Wait for the writer task to finish
567            handle.await??;
568
569            self.client
570                .rename(&self.tmp_filename, &self.final_filename, true)
571                .await
572                .to_object_store_err()?;
573
574            Ok(PutResult {
575                e_tag: None,
576                version: None,
577            })
578        } else {
579            Err(object_store::Error::NotSupported {
580                source: "Cannot call abort or complete multiple times".into(),
581            })
582        }
583    }
584
585    async fn abort(&mut self) -> Result<()> {
586        // Drop the sender so the task knows no more data is coming
587        if let Some((handle, sender)) = self.sender.take() {
588            drop(sender);
589
590            // Wait for the writer task to finish
591            handle.abort();
592
593            self.client
594                .delete(&self.tmp_filename, false)
595                .await
596                .to_object_store_err()?;
597
598            Ok(())
599        } else {
600            Err(object_store::Error::NotSupported {
601                source: "Cannot call abort or complete multiple times".into(),
602            })
603        }
604    }
605}
606
607/// ObjectStore paths always remove the leading slash, so add it back
608fn make_absolute_file(path: &Path) -> String {
609    format!("/{}", path.as_ref())
610}
611
612fn make_absolute_dir(path: &Path) -> String {
613    if path.parts().count() > 0 {
614        format!("/{}/", path.as_ref())
615    } else {
616        "/".to_string()
617    }
618}
619
620fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
621    Ok(ObjectMeta {
622        location: Path::parse(&status.path)?,
623        last_modified: DateTime::<Utc>::from_timestamp_millis(status.modification_time as i64)
624            .unwrap(),
625        size: status.length,
626        e_tag: None,
627        version: None,
628    })
629}