hdfs_native_object_store/
lib.rs

1// #![warn(missing_docs)]
2//! [object_store::ObjectStore] implementation for the Native Rust HDFS client
3//!
4//! # Usage
5//!
6//! ```rust
7//! use hdfs_native_object_store::HdfsObjectStore;
8//! # use object_store::Result;
9//! # fn main() -> Result<()> {
10//! let store = HdfsObjectStore::with_url("hdfs://localhost:9000")?;
11//! # Ok(())
12//! # }
13//! ```
14//!
15use std::{
16    collections::HashMap,
17    fmt::{Display, Formatter},
18    future,
19    path::PathBuf,
20    sync::Arc,
21};
22
23use async_trait::async_trait;
24use chrono::{DateTime, Utc};
25use futures::{
26    stream::{BoxStream, StreamExt},
27    FutureExt,
28};
29use hdfs_native::{client::FileStatus, file::FileWriter, Client, HdfsError, WriteOptions};
30use object_store::{
31    path::Path, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
32    ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
33    UploadPart,
34};
35use tokio::{
36    sync::{mpsc, oneshot},
37    task::{self, JoinHandle},
38};
39
40// Re-export minidfs for down-stream integration tests
41#[cfg(feature = "integration-test")]
42pub use hdfs_native::minidfs;
43
44#[derive(Debug)]
45pub struct HdfsObjectStore {
46    client: Arc<Client>,
47}
48
49impl HdfsObjectStore {
50    /// Creates a new HdfsObjectStore from an existing [Client]
51    ///
52    /// ```rust
53    /// # use std::sync::Arc;
54    /// use hdfs_native::Client;
55    /// # use hdfs_native_object_store::HdfsObjectStore;
56    /// let client = Client::new("hdfs://127.0.0.1:9000").unwrap();
57    /// let store = HdfsObjectStore::new(Arc::new(client));
58    /// ```
59    pub fn new(client: Arc<Client>) -> Self {
60        Self { client }
61    }
62
63    /// Creates a new HdfsObjectStore using the specified URL
64    ///
65    /// Connect to a NameNode
66    /// ```rust
67    /// # use hdfs_native_object_store::HdfsObjectStore;
68    /// # fn main() -> object_store::Result<()> {
69    /// let store = HdfsObjectStore::with_url("hdfs://127.0.0.1:9000")?;
70    /// # Ok(())
71    /// # }
72    /// ```
73    pub fn with_url(url: &str) -> Result<Self> {
74        Ok(Self::new(Arc::new(Client::new(url).to_object_store_err()?)))
75    }
76
77    /// Creates a new HdfsObjectStore using the specified URL and Hadoop configs.
78    ///
79    /// Connect to a NameService
80    /// ```rust
81    /// # use hdfs_native_object_store::HdfsObjectStore;
82    /// # use std::collections::HashMap;
83    /// # fn main() -> object_store::Result<()> {
84    /// let config = HashMap::from([
85    ///     ("dfs.ha.namenodes.ns".to_string(), "nn1,nn2".to_string()),
86    ///     ("dfs.namenode.rpc-address.ns.nn1".to_string(), "nn1.example.com:9000".to_string()),
87    ///     ("dfs.namenode.rpc-address.ns.nn2".to_string(), "nn2.example.com:9000".to_string()),
88    /// ]);
89    /// let store = HdfsObjectStore::with_config("hdfs://ns", config)?;
90    /// # Ok(())
91    /// # }
92    /// ```
93    pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
94        Ok(Self::new(Arc::new(
95            Client::new_with_config(url, config).to_object_store_err()?,
96        )))
97    }
98
99    async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
100        let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await {
101            Ok(_) if overwrite => true,
102            Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to))).to_object_store_err()?,
103            Err(HdfsError::FileNotFound(_)) => false,
104            Err(e) => Err(e).to_object_store_err()?,
105        };
106
107        let write_options = WriteOptions {
108            overwrite,
109            ..Default::default()
110        };
111
112        let file = self
113            .client
114            .read(&make_absolute_file(from))
115            .await
116            .to_object_store_err()?;
117        let mut stream = file.read_range_stream(0, file.file_length()).boxed();
118
119        let mut new_file = self
120            .client
121            .create(&make_absolute_file(to), write_options)
122            .await
123            .to_object_store_err()?;
124
125        while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
126            new_file.write(bytes).await.to_object_store_err()?;
127        }
128        new_file.close().await.to_object_store_err()?;
129
130        Ok(())
131    }
132
133    async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
134        let path_buf = PathBuf::from(file_path);
135
136        let file_name = path_buf
137            .file_name()
138            .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
139            .to_object_store_err()?
140            .to_str()
141            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
142            .to_object_store_err()?
143            .to_string();
144
145        let tmp_file_path = path_buf
146            .with_file_name(format!(".{}.tmp", file_name))
147            .to_str()
148            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
149            .to_object_store_err()?
150            .to_string();
151
152        // Try to create a file with an incrementing index until we find one that doesn't exist yet
153        let mut index = 1;
154        loop {
155            let path = format!("{}.{}", tmp_file_path, index);
156            match self.client.create(&path, WriteOptions::default()).await {
157                Ok(writer) => break Ok((writer, path)),
158                Err(HdfsError::AlreadyExists(_)) => index += 1,
159                Err(e) => break Err(e).to_object_store_err(),
160            }
161        }
162    }
163}
164
165impl Display for HdfsObjectStore {
166    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167        write!(f, "HdfsObjectStore")
168    }
169}
170
171impl From<Client> for HdfsObjectStore {
172    fn from(value: Client) -> Self {
173        Self::new(Arc::new(value))
174    }
175}
176
177#[async_trait]
178impl ObjectStore for HdfsObjectStore {
179    /// Save the provided bytes to the specified location
180    ///
181    /// To make the operation atomic, we write to a temporary file `.{filename}.tmp.{i}` and rename
182    /// on a successful write, where `i` is an integer that is incremented until a non-existent file
183    /// is found.
184    async fn put_opts(
185        &self,
186        location: &Path,
187        payload: PutPayload,
188        opts: PutOptions,
189    ) -> Result<PutResult> {
190        let overwrite = match opts.mode {
191            PutMode::Create => false,
192            PutMode::Overwrite => true,
193            PutMode::Update(_) => {
194                return Err(object_store::Error::NotSupported {
195                    source: "Update mode not supported".to_string().into(),
196                })
197            }
198        };
199
200        let final_file_path = make_absolute_file(location);
201
202        // If we're not overwriting, do an upfront check to see if the file already
203        // exists. Otherwise we have to write the whole file and try to rename before
204        // finding out.
205        if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
206            return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
207        }
208
209        let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
210
211        for bytes in payload {
212            tmp_file.write(bytes).await.to_object_store_err()?;
213        }
214        tmp_file.close().await.to_object_store_err()?;
215
216        self.client
217            .rename(&tmp_file_path, &final_file_path, overwrite)
218            .await
219            .to_object_store_err()?;
220
221        Ok(PutResult {
222            e_tag: None,
223            version: None,
224        })
225    }
226
227    /// Create a multipart writer that writes to a temporary file in a background task, and renames
228    /// to the final destination on complete.
229    async fn put_multipart_opts(
230        &self,
231        location: &Path,
232        _opts: PutMultipartOpts,
233    ) -> Result<Box<dyn MultipartUpload>> {
234        let final_file_path = make_absolute_file(location);
235
236        let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
237
238        Ok(Box::new(HdfsMultipartWriter::new(
239            Arc::clone(&self.client),
240            tmp_file,
241            &tmp_file_path,
242            &final_file_path,
243        )))
244    }
245
246    /// Reads data for the specified location.
247    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
248        if options.if_match.is_some()
249            || options.if_none_match.is_some()
250            || options.if_modified_since.is_some()
251            || options.if_unmodified_since.is_some()
252        {
253            return Err(object_store::Error::NotImplemented);
254        }
255
256        let meta = self.head(location).await?;
257
258        let range = options
259            .range
260            .map(|r| match r {
261                GetRange::Bounded(range) => range,
262                GetRange::Offset(offset) => offset..meta.size,
263                GetRange::Suffix(suffix) => meta.size.saturating_sub(suffix)..meta.size,
264            })
265            .unwrap_or(0..meta.size);
266
267        let reader = self
268            .client
269            .read(&make_absolute_file(location))
270            .await
271            .to_object_store_err()?;
272        let stream = reader
273            .read_range_stream(range.start, range.end - range.start)
274            .map(|b| b.to_object_store_err())
275            .boxed();
276
277        let payload = GetResultPayload::Stream(stream);
278
279        Ok(GetResult {
280            payload,
281            meta,
282            range,
283            attributes: Default::default(),
284        })
285    }
286
287    /// Return the metadata for the specified location
288    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
289        let status = self
290            .client
291            .get_file_info(&make_absolute_file(location))
292            .await
293            .to_object_store_err()?;
294
295        if status.isdir {
296            return Err(HdfsError::IsADirectoryError(
297                "Head must be called on a file".to_string(),
298            ))
299            .to_object_store_err();
300        }
301
302        Ok(ObjectMeta {
303            location: location.clone(),
304            last_modified: DateTime::<Utc>::from_timestamp(status.modification_time as i64, 0)
305                .unwrap(),
306            size: status.length,
307            e_tag: None,
308            version: None,
309        })
310    }
311
312    /// Delete the object at the specified location.
313    async fn delete(&self, location: &Path) -> Result<()> {
314        let result = self
315            .client
316            .delete(&make_absolute_file(location), false)
317            .await
318            .to_object_store_err()?;
319
320        if !result {
321            Err(HdfsError::OperationFailed(
322                "failed to delete object".to_string(),
323            ))
324            .to_object_store_err()?
325        }
326
327        Ok(())
328    }
329
330    /// List all the objects with the given prefix.
331    ///
332    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
333    /// `foo/bar_baz/x`.
334    ///
335    /// Note: the order of returned [`ObjectMeta`] is not guaranteed
336    fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
337        let status_stream = self
338            .client
339            .list_status_iter(
340                &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
341                true,
342            )
343            .into_stream()
344            .filter(|res| {
345                let result = match res {
346                    Ok(status) => !status.isdir,
347                    // Listing by prefix should just return an empty list if the prefix isn't found
348                    Err(HdfsError::FileNotFound(_)) => false,
349                    _ => true,
350                };
351                future::ready(result)
352            })
353            .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
354
355        Box::pin(status_stream)
356    }
357
358    /// List objects with the given prefix and an implementation specific
359    /// delimiter. Returns common prefixes (directories) in addition to object
360    /// metadata.
361    ///
362    /// Prefixes are evaluated on a path segment basis, i.e. `foo/bar/` is a prefix of `foo/bar/x` but not of
363    /// `foo/bar_baz/x`.
364    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
365        let mut status_stream = self
366            .client
367            .list_status_iter(
368                &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
369                false,
370            )
371            .into_stream()
372            .filter(|res| {
373                let result = match res {
374                    // Listing by prefix should just return an empty list if the prefix isn't found
375                    Err(HdfsError::FileNotFound(_)) => false,
376                    _ => true,
377                };
378                future::ready(result)
379            });
380
381        let mut statuses = Vec::<FileStatus>::new();
382        while let Some(status) = status_stream.next().await {
383            statuses.push(status.to_object_store_err()?);
384        }
385
386        let mut dirs: Vec<Path> = Vec::new();
387        for status in statuses.iter().filter(|s| s.isdir) {
388            dirs.push(Path::parse(&status.path)?)
389        }
390
391        let mut files: Vec<ObjectMeta> = Vec::new();
392        for status in statuses.iter().filter(|s| !s.isdir) {
393            files.push(get_object_meta(status)?)
394        }
395
396        Ok(ListResult {
397            common_prefixes: dirs,
398            objects: files,
399        })
400    }
401
402    /// Renames a file. This operation is guaranteed to be atomic.
403    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
404        Ok(self
405            .client
406            .rename(&make_absolute_file(from), &make_absolute_file(to), true)
407            .await
408            .to_object_store_err()?)
409    }
410
411    /// Renames a file only if the distination doesn't exist. This operation is guaranteed
412    /// to be atomic.
413    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
414        Ok(self
415            .client
416            .rename(&make_absolute_file(from), &make_absolute_file(to), false)
417            .await
418            .to_object_store_err()?)
419    }
420
421    /// Copy an object from one path to another in the same object store.
422    ///
423    /// If there exists an object at the destination, it will be overwritten.
424    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
425        self.internal_copy(from, to, true).await
426    }
427
428    /// Copy an object from one path to another, only if destination is empty.
429    ///
430    /// Will return an error if the destination already has an object.
431    ///
432    /// Performs an atomic operation if the underlying object storage supports it.
433    /// If atomic operations are not supported by the underlying object storage (like S3)
434    /// it will return an error.
435    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
436        self.internal_copy(from, to, false).await
437    }
438}
439
440#[cfg(feature = "integration-test")]
441pub trait HdfsErrorConvert<T> {
442    fn to_object_store_err(self) -> Result<T>;
443}
444
445#[cfg(not(feature = "integration-test"))]
446trait HdfsErrorConvert<T> {
447    fn to_object_store_err(self) -> Result<T>;
448}
449
450impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
451    fn to_object_store_err(self) -> Result<T> {
452        self.map_err(|err| match err {
453            HdfsError::FileNotFound(path) => object_store::Error::NotFound {
454                path: path.clone(),
455                source: Box::new(HdfsError::FileNotFound(path)),
456            },
457            HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
458                path: path.clone(),
459                source: Box::new(HdfsError::AlreadyExists(path)),
460            },
461            _ => object_store::Error::Generic {
462                store: "HdfsObjectStore",
463                source: Box::new(err),
464            },
465        })
466    }
467}
468
469type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
470
471// Create a fake multipart writer the creates an uploader to a temp file as a background
472// task, and submits new parts to be uploaded to a queue for this task.
473// A once cell is used to track whether a part has finished writing or not.
474// On completing, rename the file to the actual target.
475struct HdfsMultipartWriter {
476    client: Arc<Client>,
477    sender: Option<(JoinHandle<Result<()>>, PartSender)>,
478    tmp_filename: String,
479    final_filename: String,
480}
481
482impl HdfsMultipartWriter {
483    fn new(
484        client: Arc<Client>,
485        writer: FileWriter,
486        tmp_filename: &str,
487        final_filename: &str,
488    ) -> Self {
489        let (sender, receiver) = mpsc::unbounded_channel();
490
491        Self {
492            client,
493            sender: Some((Self::start_writer_task(writer, receiver), sender)),
494            tmp_filename: tmp_filename.to_string(),
495            final_filename: final_filename.to_string(),
496        }
497    }
498
499    fn start_writer_task(
500        mut writer: FileWriter,
501        mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
502    ) -> JoinHandle<Result<()>> {
503        task::spawn(async move {
504            'outer: loop {
505                match part_receiver.recv().await {
506                    Some((sender, part)) => {
507                        for bytes in part {
508                            if let Err(e) = writer.write(bytes).await.to_object_store_err() {
509                                let _ = sender.send(Err(e));
510                                break 'outer;
511                            }
512                        }
513                        let _ = sender.send(Ok(()));
514                    }
515                    None => {
516                        return writer.close().await.to_object_store_err();
517                    }
518                }
519            }
520
521            // If we've reached here, a write task failed so just return Err's for all new parts that come in
522            while let Some((sender, _)) = part_receiver.recv().await {
523                let _ = sender.send(
524                    Err(HdfsError::OperationFailed(
525                        "Write failed during one of the parts".to_string(),
526                    ))
527                    .to_object_store_err(),
528                );
529            }
530            Err(HdfsError::OperationFailed(
531                "Write failed during one of the parts".to_string(),
532            ))
533            .to_object_store_err()
534        })
535    }
536}
537
538impl std::fmt::Debug for HdfsMultipartWriter {
539    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
540        f.debug_struct("HdfsMultipartWriter")
541            .field("tmp_filename", &self.tmp_filename)
542            .field("final_filename", &self.final_filename)
543            .finish()
544    }
545}
546
547#[async_trait]
548impl MultipartUpload for HdfsMultipartWriter {
549    fn put_part(&mut self, payload: PutPayload) -> UploadPart {
550        let (result_sender, result_receiver) = oneshot::channel();
551
552        if let Some((_, payload_sender)) = self.sender.as_ref() {
553            payload_sender.send((result_sender, payload)).unwrap();
554        } else {
555            result_sender
556                .send(
557                    Err(HdfsError::OperationFailed(
558                        "Cannot put part after completing or aborting".to_string(),
559                    ))
560                    .to_object_store_err(),
561                )
562                .unwrap();
563        }
564
565        async { result_receiver.await.unwrap() }.boxed()
566    }
567
568    async fn complete(&mut self) -> Result<PutResult> {
569        // Drop the sender so the task knows no more data is coming
570        if let Some((handle, sender)) = self.sender.take() {
571            drop(sender);
572
573            // Wait for the writer task to finish
574            handle.await??;
575
576            self.client
577                .rename(&self.tmp_filename, &self.final_filename, true)
578                .await
579                .to_object_store_err()?;
580
581            Ok(PutResult {
582                e_tag: None,
583                version: None,
584            })
585        } else {
586            Err(object_store::Error::NotSupported {
587                source: "Cannot call abort or complete multiple times".into(),
588            })
589        }
590    }
591
592    async fn abort(&mut self) -> Result<()> {
593        // Drop the sender so the task knows no more data is coming
594        if let Some((handle, sender)) = self.sender.take() {
595            drop(sender);
596
597            // Wait for the writer task to finish
598            handle.abort();
599
600            self.client
601                .delete(&self.tmp_filename, false)
602                .await
603                .to_object_store_err()?;
604
605            Ok(())
606        } else {
607            Err(object_store::Error::NotSupported {
608                source: "Cannot call abort or complete multiple times".into(),
609            })
610        }
611    }
612}
613
614/// ObjectStore paths always remove the leading slash, so add it back
615fn make_absolute_file(path: &Path) -> String {
616    format!("/{}", path.as_ref())
617}
618
619fn make_absolute_dir(path: &Path) -> String {
620    if path.parts().count() > 0 {
621        format!("/{}/", path.as_ref())
622    } else {
623        "/".to_string()
624    }
625}
626
627fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
628    Ok(ObjectMeta {
629        location: Path::parse(&status.path)?,
630        last_modified: DateTime::<Utc>::from_timestamp(status.modification_time as i64, 0).unwrap(),
631        size: status.length,
632        e_tag: None,
633        version: None,
634    })
635}