1use std::{
14 collections::HashMap,
15 fmt::{Display, Formatter},
16 future,
17 path::PathBuf,
18};
19
20use async_trait::async_trait;
21use chrono::{DateTime, Utc};
22use futures::{
23 stream::{BoxStream, StreamExt},
24 FutureExt,
25};
26use hdfs_native::{
27 client::FileStatus, file::FileWriter, Client, ClientBuilder, HdfsError, WriteOptions,
28};
29#[allow(deprecated)]
30use object_store::{
31 path::Path, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
32 ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result, UploadPart,
33};
34use tokio::{
35 runtime::Handle,
36 sync::{mpsc, oneshot},
37 task::{self, JoinHandle},
38};
39
40#[cfg(feature = "integration-test")]
42pub use hdfs_native::minidfs;
43
44fn generic_error(
45 source: Box<dyn std::error::Error + Send + Sync + 'static>,
46) -> object_store::Error {
47 object_store::Error::Generic {
48 store: "HFDS",
49 source,
50 }
51}
52
53#[derive(Default)]
55pub struct HdfsObjectStoreBuilder {
56 inner: ClientBuilder,
57}
58
59impl HdfsObjectStoreBuilder {
60 pub fn new() -> Self {
62 Self::default()
63 }
64
65 pub fn with_url(mut self, url: impl Into<String>) -> Self {
67 self.inner = self.inner.with_url(url);
68 self
69 }
70
71 pub fn with_config(
73 mut self,
74 config: impl IntoIterator<Item = (impl Into<String>, impl Into<String>)>,
75 ) -> Self {
76 self.inner = self.inner.with_config(config);
77 self
78 }
79
80 pub fn with_io_runtime(mut self, runtime: Handle) -> Self {
82 self.inner = self.inner.with_io_runtime(runtime);
83 self
84 }
85
86 pub fn build(self) -> Result<HdfsObjectStore> {
88 let client = self.inner.build().to_object_store_err()?;
89
90 Ok(HdfsObjectStore { client })
91 }
92}
93
94#[derive(Debug, Clone)]
96pub struct HdfsObjectStore {
97 client: Client,
98}
99
100impl HdfsObjectStore {
101 pub fn new(client: Client) -> Self {
111 Self { client }
112 }
113
114 #[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
125 pub fn with_url(url: &str) -> Result<Self> {
126 let client = ClientBuilder::new()
127 .with_url(url)
128 .build()
129 .to_object_store_err()?;
130
131 Ok(Self { client })
132 }
133
134 #[deprecated(since = "0.15.0", note = "Use HdfsObjectStoreBuilder instead")]
151 pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
152 let client = ClientBuilder::new()
153 .with_url(url)
154 .with_config(config)
155 .build()
156 .to_object_store_err()?;
157
158 Ok(Self { client })
159 }
160
161 async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
162 let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await {
163 Ok(_) if overwrite => true,
164 Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to))).to_object_store_err()?,
165 Err(HdfsError::FileNotFound(_)) => false,
166 Err(e) => Err(e).to_object_store_err()?,
167 };
168
169 let write_options = WriteOptions {
170 overwrite,
171 ..Default::default()
172 };
173
174 let file = self
175 .client
176 .read(&make_absolute_file(from))
177 .await
178 .to_object_store_err()?;
179 let mut stream = file.read_range_stream(0, file.file_length()).boxed();
180
181 let mut new_file = self
182 .client
183 .create(&make_absolute_file(to), write_options)
184 .await
185 .to_object_store_err()?;
186
187 while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
188 new_file.write(bytes).await.to_object_store_err()?;
189 }
190 new_file.close().await.to_object_store_err()?;
191
192 Ok(())
193 }
194
195 async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
196 let path_buf = PathBuf::from(file_path);
197
198 let file_name = path_buf
199 .file_name()
200 .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
201 .to_object_store_err()?
202 .to_str()
203 .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
204 .to_object_store_err()?
205 .to_string();
206
207 let tmp_file_path = path_buf
208 .with_file_name(format!(".{file_name}.tmp"))
209 .to_str()
210 .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
211 .to_object_store_err()?
212 .to_string();
213
214 let mut index = 1;
216 loop {
217 let path = format!("{tmp_file_path}.{index}");
218 match self.client.create(&path, WriteOptions::default()).await {
219 Ok(writer) => break Ok((writer, path)),
220 Err(HdfsError::AlreadyExists(_)) => index += 1,
221 Err(e) => break Err(e).to_object_store_err(),
222 }
223 }
224 }
225}
226
227impl Display for HdfsObjectStore {
228 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
229 write!(f, "HdfsObjectStore")
230 }
231}
232
233impl From<Client> for HdfsObjectStore {
234 fn from(value: Client) -> Self {
235 Self { client: value }
236 }
237}
238
239#[async_trait]
240impl ObjectStore for HdfsObjectStore {
241 async fn put_opts(
247 &self,
248 location: &Path,
249 payload: PutPayload,
250 opts: PutOptions,
251 ) -> Result<PutResult> {
252 let overwrite = match opts.mode {
253 PutMode::Create => false,
254 PutMode::Overwrite => true,
255 PutMode::Update(_) => {
256 return Err(object_store::Error::NotImplemented);
257 }
258 };
259
260 let final_file_path = make_absolute_file(location);
261
262 if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
266 return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
267 }
268
269 let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
270
271 for bytes in payload {
272 tmp_file.write(bytes).await.to_object_store_err()?;
273 }
274 tmp_file.close().await.to_object_store_err()?;
275
276 self.client
277 .rename(&tmp_file_path, &final_file_path, overwrite)
278 .await
279 .to_object_store_err()?;
280
281 let e_tag = self.head(location).await?.e_tag;
282
283 Ok(PutResult {
284 e_tag,
285 version: None,
286 })
287 }
288
289 #[allow(deprecated)]
292 async fn put_multipart_opts(
293 &self,
294 location: &Path,
295 _opts: PutMultipartOpts,
296 ) -> Result<Box<dyn MultipartUpload>> {
297 let final_file_path = make_absolute_file(location);
298
299 let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
300
301 Ok(Box::new(HdfsMultipartWriter::new(
302 self.client.clone(),
303 tmp_file,
304 &tmp_file_path,
305 &final_file_path,
306 )))
307 }
308
309 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
311 let meta = self.head(location).await?;
312
313 options.check_preconditions(&meta)?;
314
315 let range = options
316 .range
317 .map(|r| r.as_range(meta.size))
318 .transpose()
319 .map_err(|source| generic_error(source.into()))?
320 .unwrap_or(0..meta.size);
321
322 let reader = self
323 .client
324 .read(&make_absolute_file(location))
325 .await
326 .to_object_store_err()?;
327 let start: usize = range
328 .start
329 .try_into()
330 .expect("unable to convert range.start to usize");
331 let end: usize = range
332 .end
333 .try_into()
334 .expect("unable to convert range.end to usize");
335 let stream = reader
336 .read_range_stream(start, end - start)
337 .map(|b| b.to_object_store_err())
338 .boxed();
339
340 let payload = GetResultPayload::Stream(stream);
341
342 Ok(GetResult {
343 payload,
344 meta,
345 range,
346 attributes: Default::default(),
347 })
348 }
349
350 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
352 let status = self
353 .client
354 .get_file_info(&make_absolute_file(location))
355 .await
356 .to_object_store_err()?;
357
358 if status.isdir {
359 return Err(object_store::Error::NotFound {
360 path: location.to_string(),
361 source: "Head cannot be called on a directory".into(),
362 });
363 }
364
365 get_object_meta(&status)
366 }
367
368 async fn delete(&self, location: &Path) -> Result<()> {
370 let result = self
371 .client
372 .delete(&make_absolute_file(location), false)
373 .await
374 .to_object_store_err()?;
375
376 if !result {
377 Err(HdfsError::FileNotFound(location.to_string())).to_object_store_err()?
378 }
379
380 Ok(())
381 }
382
383 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
390 let absolute_dir = prefix.map(make_absolute_file).unwrap_or("/".to_string());
391
392 let status_stream = self
393 .client
394 .list_status_iter(&absolute_dir, true)
395 .into_stream()
396 .filter(move |res| {
397 let result = match res {
398 Ok(status) => !status.isdir && status.path != absolute_dir,
401 Err(HdfsError::FileNotFound(_)) => false,
403 _ => true,
404 };
405 future::ready(result)
406 })
407 .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
408
409 Box::pin(status_stream)
410 }
411
412 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
419 let absolute_dir = prefix.map(make_absolute_file).unwrap_or("/".to_string());
420
421 let mut status_stream = self
422 .client
423 .list_status_iter(&absolute_dir, false)
424 .into_stream()
425 .filter(move |res| {
426 let result = match res {
427 Ok(status) => status.path != absolute_dir,
429 Err(HdfsError::FileNotFound(_)) => false,
431 _ => true,
432 };
433 future::ready(result)
434 });
435
436 let mut statuses = Vec::<FileStatus>::new();
437 while let Some(status) = status_stream.next().await {
438 statuses.push(status.to_object_store_err()?);
439 }
440
441 let mut dirs: Vec<Path> = Vec::new();
442 for status in statuses.iter().filter(|s| s.isdir) {
443 dirs.push(Path::parse(&status.path)?)
444 }
445
446 let mut files: Vec<ObjectMeta> = Vec::new();
447 for status in statuses.iter().filter(|s| !s.isdir) {
448 files.push(get_object_meta(status)?)
449 }
450
451 Ok(ListResult {
452 common_prefixes: dirs,
453 objects: files,
454 })
455 }
456
457 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
459 let mut parent: Vec<_> = to.parts().collect();
461 parent.pop();
462
463 if !parent.is_empty() {
464 let parent_path: Path = parent.into_iter().collect();
465 self.client
466 .mkdirs(&make_absolute_dir(&parent_path), 0o755, true)
467 .await
468 .to_object_store_err()?;
469 }
470
471 Ok(self
472 .client
473 .rename(&make_absolute_file(from), &make_absolute_file(to), true)
474 .await
475 .to_object_store_err()?)
476 }
477
478 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
481 self.client
482 .rename(&make_absolute_file(from), &make_absolute_file(to), false)
483 .await
484 .to_object_store_err()
485 }
486
487 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
491 self.internal_copy(from, to, true).await
492 }
493
494 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
502 self.internal_copy(from, to, false).await
503 }
504}
505
506trait HdfsErrorConvert<T> {
507 fn to_object_store_err(self) -> Result<T>;
508}
509
510impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
511 fn to_object_store_err(self) -> Result<T> {
512 self.map_err(|err| match err {
513 HdfsError::FileNotFound(path) => object_store::Error::NotFound {
514 path: path.clone(),
515 source: Box::new(HdfsError::FileNotFound(path)),
516 },
517 HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
518 path: path.clone(),
519 source: Box::new(HdfsError::AlreadyExists(path)),
520 },
521 _ => object_store::Error::Generic {
522 store: "HdfsObjectStore",
523 source: Box::new(err),
524 },
525 })
526 }
527}
528
529type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
530
531struct HdfsMultipartWriter {
536 client: Client,
537 sender: Option<(JoinHandle<Result<()>>, PartSender)>,
538 tmp_filename: String,
539 final_filename: String,
540}
541
542impl HdfsMultipartWriter {
543 fn new(client: Client, writer: FileWriter, tmp_filename: &str, final_filename: &str) -> Self {
544 let (sender, receiver) = mpsc::unbounded_channel();
545
546 Self {
547 client,
548 sender: Some((Self::start_writer_task(writer, receiver), sender)),
549 tmp_filename: tmp_filename.to_string(),
550 final_filename: final_filename.to_string(),
551 }
552 }
553
554 fn start_writer_task(
555 mut writer: FileWriter,
556 mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
557 ) -> JoinHandle<Result<()>> {
558 task::spawn(async move {
559 loop {
560 match part_receiver.recv().await {
561 Some((sender, part)) => {
562 for bytes in part {
563 if let Err(e) = writer.write(bytes).await {
564 let _ = sender.send(Err(e).to_object_store_err());
565 return Err(generic_error("Failed to write all parts".into()));
566 }
567 }
568 let _ = sender.send(Ok(()));
569 }
570 None => return writer.close().await.to_object_store_err(),
571 }
572 }
573 })
574 }
575}
576
577impl std::fmt::Debug for HdfsMultipartWriter {
578 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
579 f.debug_struct("HdfsMultipartWriter")
580 .field("tmp_filename", &self.tmp_filename)
581 .field("final_filename", &self.final_filename)
582 .finish()
583 }
584}
585
586#[async_trait]
587impl MultipartUpload for HdfsMultipartWriter {
588 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
589 let (result_sender, result_receiver) = oneshot::channel();
590
591 if let Some((_, payload_sender)) = self.sender.as_ref() {
592 if let Err(mpsc::error::SendError((result_sender, _))) =
593 payload_sender.send((result_sender, payload))
594 {
595 let _ = result_sender.send(Err(generic_error("Write task failed".into())));
596 }
597 } else {
598 let _ = result_sender.send(Err(generic_error(
599 "Cannot put part after completing or aborting".into(),
600 )));
601 }
602
603 async {
604 result_receiver
605 .await
606 .unwrap_or_else(|_| Err(generic_error("Write task failed".into())))
607 }
608 .boxed()
609 }
610
611 async fn complete(&mut self) -> Result<PutResult> {
612 if let Some((handle, sender)) = self.sender.take() {
614 drop(sender);
615
616 handle.await??;
618
619 self.client
620 .rename(&self.tmp_filename, &self.final_filename, true)
621 .await
622 .to_object_store_err()?;
623
624 Ok(PutResult {
625 e_tag: None,
626 version: None,
627 })
628 } else {
629 Err(generic_error(
630 "Cannot call abort or complete multiple times".into(),
631 ))
632 }
633 }
634
635 async fn abort(&mut self) -> Result<()> {
636 if let Some((handle, sender)) = self.sender.take() {
638 drop(sender);
639
640 handle.abort();
642
643 self.client
644 .delete(&self.tmp_filename, false)
645 .await
646 .to_object_store_err()?;
647
648 Ok(())
649 } else {
650 Err(generic_error(
651 "Cannot call abort or complete multiple times".into(),
652 ))
653 }
654 }
655}
656
657fn make_absolute_file(path: &Path) -> String {
659 format!("/{}", path.as_ref())
660}
661
662fn make_absolute_dir(path: &Path) -> String {
663 if path.parts().count() > 0 {
664 format!("/{}/", path.as_ref())
665 } else {
666 "/".to_string()
667 }
668}
669
670fn get_etag(status: &FileStatus) -> String {
671 let size = status.length;
672 let mtime = status.modification_time;
673
674 format!("{mtime:x}-{size:x}")
678}
679
680fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
681 Ok(ObjectMeta {
682 location: Path::parse(&status.path)?,
683 last_modified: DateTime::<Utc>::from_timestamp_millis(status.modification_time as i64)
684 .ok_or(generic_error(
685 "Last modified timestamp out of bounds".into(),
686 ))?,
687 size: status.length as u64,
688 e_tag: Some(get_etag(status)),
689 version: None,
690 })
691}
692
693#[cfg(test)]
694#[cfg(feature = "integration-test")]
695mod test {
696 use std::collections::HashSet;
697
698 use object_store::integration::*;
699 use serial_test::serial;
700 use tokio::runtime::Runtime;
701
702 use crate::HdfsObjectStoreBuilder;
703
704 #[tokio::test]
705 #[serial]
706 async fn hdfs_test() {
707 let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
708 hdfs_native::minidfs::DfsFeatures::HA,
709 ]));
710
711 let integration = HdfsObjectStoreBuilder::new()
712 .with_url(&dfs.url)
713 .build()
714 .unwrap();
715
716 put_get_delete_list(&integration).await;
717 list_uses_directories_correctly(&integration).await;
718 list_with_delimiter(&integration).await;
719 rename_and_copy(&integration).await;
720 copy_if_not_exists(&integration).await;
721 multipart_race_condition(&integration, true).await;
722 multipart_out_of_order(&integration).await;
723 get_opts(&integration).await;
724 put_opts(&integration, false).await;
725 }
726
727 #[test]
728 #[serial]
729 fn test_no_tokio() {
730 let dfs = hdfs_native::minidfs::MiniDfs::with_features(&HashSet::from([
731 hdfs_native::minidfs::DfsFeatures::HA,
732 ]));
733
734 let integration = HdfsObjectStoreBuilder::new()
735 .with_url(&dfs.url)
736 .build()
737 .unwrap();
738
739 futures::executor::block_on(get_opts(&integration));
740
741 let rt = Runtime::new().unwrap();
742
743 let integration = HdfsObjectStoreBuilder::new()
744 .with_url(&dfs.url)
745 .with_io_runtime(rt.handle().clone())
746 .build()
747 .unwrap();
748
749 futures::executor::block_on(get_opts(&integration));
750 }
751}