1use std::{
14 collections::HashMap,
15 fmt::{Display, Formatter},
16 future,
17 path::PathBuf,
18};
19
20use async_trait::async_trait;
21use chrono::{DateTime, Utc};
22use futures::{
23 FutureExt,
24 stream::{BoxStream, StreamExt},
25};
26use hdfs_native::{
27 Client, ClientBuilder, HdfsError, WriteOptions, client::FileStatus, file::FileWriter,
28};
29use object_store::{CopyMode, CopyOptions, RenameOptions, RenameTargetMode};
30#[allow(deprecated)]
31use object_store::{
32 GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta, ObjectStore,
33 PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart, path::Path,
34};
35use tokio::{
36 runtime::Handle,
37 sync::{mpsc, oneshot},
38 task::{self, JoinHandle},
39};
40
41#[cfg(feature = "integration-test")]
43pub use hdfs_native::minidfs;
44
45fn generic_error(
46 source: Box<dyn std::error::Error + Send + Sync + 'static>,
47) -> object_store::Error {
48 object_store::Error::Generic {
49 store: "HFDS",
50 source,
51 }
52}
53
54#[derive(Default)]
56pub struct HdfsObjectStoreBuilder {
57 inner: ClientBuilder,
58}
59
60impl HdfsObjectStoreBuilder {
61 pub fn new() -> Self {
63 Self::default()
64 }
65
66 pub fn with_url(mut self, url: impl Into<String>) -> Self {
68 self.inner = self.inner.with_url(url);
69 self
70 }
71
72 pub fn with_config(
74 mut self,
75 config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
76 ) -> Self {
77 self.inner = self.inner.with_config(config);
78 self
79 }
80
81 pub fn with_io_runtime(mut self, runtime: Handle) -> Self {
83 self.inner = self.inner.with_io_runtime(runtime);
84 self
85 }
86
87 pub fn build(self) -> Result<HdfsObjectStore> {
89 let client = self.inner.build().to_object_store_err()?;
90
91 Ok(HdfsObjectStore { client })
92 }
93}
94
95#[derive(Debug, Clone)]
97pub struct HdfsObjectStore {
98 client: Client,
99}
100
101impl HdfsObjectStore {
102 pub fn new(client: Client) -> Self {
112 Self { client }
113 }
114
115 #[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
126 pub fn with_url(url: &str) -> Result<Self> {
127 let client = ClientBuilder::new()
128 .with_url(url)
129 .build()
130 .to_object_store_err()?;
131
132 Ok(Self { client })
133 }
134
135 #[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
152 pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
153 let client = ClientBuilder::new()
154 .with_url(url)
155 .with_config(config)
156 .build()
157 .to_object_store_err()?;
158
159 Ok(Self { client })
160 }
161
162 async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
163 let path_buf = PathBuf::from(file_path);
164
165 let file_name = path_buf
166 .file_name()
167 .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
168 .to_object_store_err()?
169 .to_str()
170 .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
171 .to_object_store_err()?
172 .to_string();
173
174 let tmp_file_path = path_buf
175 .with_file_name(format!(".{file_name}.tmp"))
176 .to_str()
177 .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
178 .to_object_store_err()?
179 .to_string();
180
181 let mut index = 1;
183 loop {
184 let path = format!("{tmp_file_path}.{index}");
185 match self.client.create(&path, WriteOptions::default()).await {
186 Ok(writer) => break Ok((writer, path)),
187 Err(HdfsError::AlreadyExists(_)) => index += 1,
188 Err(e) => break Err(e).to_object_store_err(),
189 }
190 }
191 }
192}
193
194impl Display for HdfsObjectStore {
195 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
196 write!(f, "HdfsObjectStore")
197 }
198}
199
200impl From<Client> for HdfsObjectStore {
201 fn from(value: Client) -> Self {
202 Self { client: value }
203 }
204}
205
206#[async_trait]
207impl ObjectStore for HdfsObjectStore {
208 async fn put_opts(
214 &self,
215 location: &Path,
216 payload: PutPayload,
217 opts: PutOptions,
218 ) -> Result<PutResult> {
219 let overwrite = match opts.mode {
220 PutMode::Create => false,
221 PutMode::Overwrite => true,
222 PutMode::Update(_) => {
223 return Err(object_store::Error::NotImplemented {
224 operation: "PutOptions with Update precondition".to_string(),
225 implementer: "HdfsObjectStore".to_string(),
226 });
227 }
228 };
229
230 let final_file_path = make_absolute_file(location);
231
232 if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
236 return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
237 }
238
239 let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
240
241 for bytes in payload {
242 tmp_file.write(bytes).await.to_object_store_err()?;
243 }
244 tmp_file.close().await.to_object_store_err()?;
245
246 self.client
247 .rename(&tmp_file_path, &final_file_path, overwrite)
248 .await
249 .to_object_store_err()?;
250
251 let e_tag = self
252 .get_opts(location, GetOptions::default().with_head(true))
253 .await?
254 .meta
255 .e_tag;
256
257 Ok(PutResult {
258 e_tag,
259 version: None,
260 })
261 }
262
263 #[allow(deprecated)]
266 async fn put_multipart_opts(
267 &self,
268 location: &Path,
269 _opts: PutMultipartOpts,
270 ) -> Result<Box<dyn MultipartUpload>> {
271 let final_file_path = make_absolute_file(location);
272
273 let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
274
275 Ok(Box::new(HdfsMultipartWriter::new(
276 self.client.clone(),
277 tmp_file,
278 &tmp_file_path,
279 &final_file_path,
280 )))
281 }
282
283 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
285 let status = self
286 .client
287 .get_file_info(&make_absolute_file(location))
288 .await
289 .to_object_store_err()?;
290
291 if status.isdir {
292 return Err(object_store::Error::NotFound {
293 path: location.to_string(),
294 source: "Head cannot be called on a directory".into(),
295 });
296 }
297
298 let meta = get_object_meta(&status)?;
299
300 options.check_preconditions(&meta)?;
301
302 let (range, payload) = if options.head {
303 (
304 (0..0),
305 GetResultPayload::Stream(futures::stream::empty().boxed()),
306 )
307 } else {
308 let range = options
309 .range
310 .map(|r| r.as_range(meta.size))
311 .transpose()
312 .map_err(|source| generic_error(source.into()))?
313 .unwrap_or(0..meta.size);
314
315 let reader = self
316 .client
317 .read(&make_absolute_file(location))
318 .await
319 .to_object_store_err()?;
320 let start: usize = range
321 .start
322 .try_into()
323 .expect("unable to convert range.start to usize");
324 let end: usize = range
325 .end
326 .try_into()
327 .expect("unable to convert range.end to usize");
328 let stream = reader
329 .read_range_stream(start, end - start)
330 .map(|b| b.to_object_store_err())
331 .boxed();
332
333 (range, GetResultPayload::Stream(stream))
334 };
335
336 Ok(GetResult {
337 payload,
338 meta,
339 range,
340 attributes: Default::default(),
341 })
342 }
343
344 fn delete_stream(
346 &self,
347 locations: BoxStream<'static, Result<Path>>,
348 ) -> BoxStream<'static, Result<Path>> {
349 let client = self.client.clone();
350 locations
351 .map(move |location| {
352 let client = client.clone();
353 async move {
354 let location = location?;
355 client
356 .delete(&make_absolute_file(&location), false)
357 .await
358 .to_object_store_err()?;
359 Ok(location)
360 }
361 })
362 .buffered(10)
363 .boxed()
364 }
365
366 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
373 let absolute_dir = prefix.map(make_absolute_file).unwrap_or("/".to_string());
374
375 let status_stream = self
376 .client
377 .list_status_iter(&absolute_dir, true)
378 .into_stream()
379 .filter(move |res| {
380 let result = match res {
381 Ok(status) => !status.isdir && status.path != absolute_dir,
384 Err(HdfsError::FileNotFound(_)) => false,
386 _ => true,
387 };
388 future::ready(result)
389 })
390 .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
391
392 Box::pin(status_stream)
393 }
394
395 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
402 let absolute_dir = prefix.map(make_absolute_file).unwrap_or("/".to_string());
403
404 let mut status_stream = self
405 .client
406 .list_status_iter(&absolute_dir, false)
407 .into_stream()
408 .filter(move |res| {
409 let result = match res {
410 Ok(status) => status.path != absolute_dir,
412 Err(HdfsError::FileNotFound(_)) => false,
414 _ => true,
415 };
416 future::ready(result)
417 });
418
419 let mut statuses = Vec::<FileStatus>::new();
420 while let Some(status) = status_stream.next().await {
421 statuses.push(status.to_object_store_err()?);
422 }
423
424 let mut dirs: Vec<Path> = Vec::new();
425 for status in statuses.iter().filter(|s| s.isdir) {
426 dirs.push(Path::parse(&status.path)?)
427 }
428
429 let mut files: Vec<ObjectMeta> = Vec::new();
430 for status in statuses.iter().filter(|s| !s.isdir) {
431 files.push(get_object_meta(status)?)
432 }
433
434 Ok(ListResult {
435 common_prefixes: dirs,
436 objects: files,
437 })
438 }
439
440 async fn rename_opts(&self, from: &Path, to: &Path, options: RenameOptions) -> Result<()> {
441 let mut parent: Vec<_> = to.parts().collect();
443 parent.pop();
444
445 if !parent.is_empty() {
446 let parent_path: Path = parent.into_iter().collect();
447 self.client
448 .mkdirs(&make_absolute_dir(&parent_path), 0o755, true)
449 .await
450 .to_object_store_err()?;
451 }
452
453 let overwrite = match options.target_mode {
454 RenameTargetMode::Overwrite => true,
455 RenameTargetMode::Create => false,
456 };
457
458 Ok(self
459 .client
460 .rename(
461 &make_absolute_file(from),
462 &make_absolute_file(to),
463 overwrite,
464 )
465 .await
466 .to_object_store_err()?)
467 }
468
469 async fn copy_opts(&self, from: &Path, to: &Path, options: CopyOptions) -> Result<()> {
471 let overwrite = match options.mode {
472 CopyMode::Create => {
473 match self.client.get_file_info(&make_absolute_file(to)).await {
475 Ok(_) => {
476 return Err(HdfsError::AlreadyExists(make_absolute_file(to)))
477 .to_object_store_err();
478 }
479 Err(HdfsError::FileNotFound(_)) => false,
480 Err(e) => return Err(e).to_object_store_err(),
481 }
482 }
483 CopyMode::Overwrite => true,
484 };
485
486 let write_options = WriteOptions {
487 overwrite,
488 ..Default::default()
489 };
490
491 let file = self
492 .client
493 .read(&make_absolute_file(from))
494 .await
495 .to_object_store_err()?;
496 let mut stream = file.read_range_stream(0, file.file_length()).boxed();
497
498 let mut new_file = self
499 .client
500 .create(&make_absolute_file(to), write_options)
501 .await
502 .to_object_store_err()?;
503
504 while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
505 new_file.write(bytes).await.to_object_store_err()?;
506 }
507 new_file.close().await.to_object_store_err()?;
508
509 Ok(())
510 }
511}
512
513trait HdfsErrorConvert<T> {
514 fn to_object_store_err(self) -> Result<T>;
515}
516
517impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
518 fn to_object_store_err(self) -> Result<T> {
519 self.map_err(|err| match err {
520 HdfsError::FileNotFound(path) => object_store::Error::NotFound {
521 path: path.clone(),
522 source: Box::new(HdfsError::FileNotFound(path)),
523 },
524 HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
525 path: path.clone(),
526 source: Box::new(HdfsError::AlreadyExists(path)),
527 },
528 _ => object_store::Error::Generic {
529 store: "HdfsObjectStore",
530 source: Box::new(err),
531 },
532 })
533 }
534}
535
536type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
537
538struct HdfsMultipartWriter {
543 client: Client,
544 sender: Option<(JoinHandle<Result<()>>, PartSender)>,
545 tmp_filename: String,
546 final_filename: String,
547}
548
549impl HdfsMultipartWriter {
550 fn new(client: Client, writer: FileWriter, tmp_filename: &str, final_filename: &str) -> Self {
551 let (sender, receiver) = mpsc::unbounded_channel();
552
553 Self {
554 client,
555 sender: Some((Self::start_writer_task(writer, receiver), sender)),
556 tmp_filename: tmp_filename.to_string(),
557 final_filename: final_filename.to_string(),
558 }
559 }
560
561 fn start_writer_task(
562 mut writer: FileWriter,
563 mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
564 ) -> JoinHandle<Result<()>> {
565 task::spawn(async move {
566 loop {
567 match part_receiver.recv().await {
568 Some((sender, part)) => {
569 for bytes in part {
570 if let Err(e) = writer.write(bytes).await {
571 let _ = sender.send(Err(e).to_object_store_err());
572 return Err(generic_error("Failed to write all parts".into()));
573 }
574 }
575 let _ = sender.send(Ok(()));
576 }
577 None => return writer.close().await.to_object_store_err(),
578 }
579 }
580 })
581 }
582}
583
584impl std::fmt::Debug for HdfsMultipartWriter {
585 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
586 f.debug_struct("HdfsMultipartWriter")
587 .field("tmp_filename", &self.tmp_filename)
588 .field("final_filename", &self.final_filename)
589 .finish()
590 }
591}
592
593#[async_trait]
594impl MultipartUpload for HdfsMultipartWriter {
595 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
596 let (result_sender, result_receiver) = oneshot::channel();
597
598 if let Some((_, payload_sender)) = self.sender.as_ref() {
599 if let Err(mpsc::error::SendError((result_sender, _))) =
600 payload_sender.send((result_sender, payload))
601 {
602 let _ = result_sender.send(Err(generic_error("Write task failed".into())));
603 }
604 } else {
605 let _ = result_sender.send(Err(generic_error(
606 "Cannot put part after completing or aborting".into(),
607 )));
608 }
609
610 async {
611 result_receiver
612 .await
613 .unwrap_or_else(|_| Err(generic_error("Write task failed".into())))
614 }
615 .boxed()
616 }
617
618 async fn complete(&mut self) -> Result<PutResult> {
619 if let Some((handle, sender)) = self.sender.take() {
621 drop(sender);
622
623 handle.await??;
625
626 self.client
627 .rename(&self.tmp_filename, &self.final_filename, true)
628 .await
629 .to_object_store_err()?;
630
631 Ok(PutResult {
632 e_tag: None,
633 version: None,
634 })
635 } else {
636 Err(generic_error(
637 "Cannot call abort or complete multiple times".into(),
638 ))
639 }
640 }
641
642 async fn abort(&mut self) -> Result<()> {
643 if let Some((handle, sender)) = self.sender.take() {
645 drop(sender);
646
647 handle.abort();
649
650 self.client
651 .delete(&self.tmp_filename, false)
652 .await
653 .to_object_store_err()?;
654
655 Ok(())
656 } else {
657 Err(generic_error(
658 "Cannot call abort or complete multiple times".into(),
659 ))
660 }
661 }
662}
663
664fn make_absolute_file(path: &Path) -> String {
666 format!("/{}", path.as_ref())
667}
668
669fn make_absolute_dir(path: &Path) -> String {
670 if path.parts().count() > 0 {
671 format!("/{}/", path.as_ref())
672 } else {
673 "/".to_string()
674 }
675}
676
677fn get_etag(status: &FileStatus) -> String {
678 let size = status.length;
679 let mtime = status.modification_time;
680
681 format!("{mtime:x}-{size:x}")
685}
686
687fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
688 Ok(ObjectMeta {
689 location: Path::parse(&status.path)?,
690 last_modified: DateTime::<Utc>::from_timestamp_millis(status.modification_time as i64)
691 .ok_or(generic_error(
692 "Last modified timestamp out of bounds".into(),
693 ))?,
694 size: status.length as u64,
695 e_tag: Some(get_etag(status)),
696 version: None,
697 })
698}
699
700#[cfg(test)]
701#[cfg(feature = "integration-test")]
702mod test {
703 use std::collections::HashSet;
704
705 use object_store::integration::*;
706 use serial_test::serial;
707 use tokio::runtime::Runtime;
708
709 use crate::HdfsObjectStoreBuilder;
710
711 #[tokio::test]
712 #[serial]
713 async fn hdfs_test() {
714 let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
715 hdfs_native::minidfs::DfsFeatures::HA,
716 ]));
717
718 let integration = HdfsObjectStoreBuilder::new()
719 .with_url(&dfs.url)
720 .build()
721 .unwrap();
722
723 put_get_delete_list(&integration).await;
724 list_uses_directories_correctly(&integration).await;
725 list_with_delimiter(&integration).await;
726 rename_and_copy(&integration).await;
727 copy_if_not_exists(&integration).await;
728 multipart_race_condition(&integration, true).await;
729 multipart_out_of_order(&integration).await;
730 get_opts(&integration).await;
731 put_opts(&integration, false).await;
732 }
733
734 #[test]
735 #[serial]
736 fn test_no_tokio() {
737 let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
738 hdfs_native::minidfs::DfsFeatures::HA,
739 ]));
740
741 let integration = HdfsObjectStoreBuilder::new()
742 .with_url(&dfs.url)
743 .build()
744 .unwrap();
745
746 futures::executor::block_on(get_opts(&integration));
747
748 let rt = Runtime::new().unwrap();
749
750 let integration = HdfsObjectStoreBuilder::new()
751 .with_url(&dfs.url)
752 .with_io_runtime(rt.handle().clone())
753 .build()
754 .unwrap();
755
756 futures::executor::block_on(get_opts(&integration));
757 }
758}