1use std::{
16 collections::HashMap,
17 fmt::{Display, Formatter},
18 future,
19 path::PathBuf,
20 sync::Arc,
21};
22
23use async_trait::async_trait;
24use chrono::{DateTime, Utc};
25use futures::{
26 stream::{BoxStream, StreamExt},
27 FutureExt,
28};
29use hdfs_native::{client::FileStatus, file::FileWriter, Client, HdfsError, WriteOptions};
30use object_store::{
31 path::Path, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
32 ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
33 UploadPart,
34};
35use tokio::{
36 sync::{mpsc, oneshot},
37 task::{self, JoinHandle},
38};
39
40#[cfg(feature = "integration-test")]
42pub use hdfs_native::minidfs;
43
44#[derive(Debug)]
45pub struct HdfsObjectStore {
46 client: Arc<Client>,
47}
48
49impl HdfsObjectStore {
50 pub fn new(client: Arc<Client>) -> Self {
60 Self { client }
61 }
62
63 pub fn with_url(url: &str) -> Result<Self> {
74 Ok(Self::new(Arc::new(Client::new(url).to_object_store_err()?)))
75 }
76
77 pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
94 Ok(Self::new(Arc::new(
95 Client::new_with_config(url, config).to_object_store_err()?,
96 )))
97 }
98
99 async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
100 let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await {
101 Ok(_) if overwrite => true,
102 Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to))).to_object_store_err()?,
103 Err(HdfsError::FileNotFound(_)) => false,
104 Err(e) => Err(e).to_object_store_err()?,
105 };
106
107 let write_options = WriteOptions {
108 overwrite,
109 ..Default::default()
110 };
111
112 let file = self
113 .client
114 .read(&make_absolute_file(from))
115 .await
116 .to_object_store_err()?;
117 let mut stream = file.read_range_stream(0, file.file_length()).boxed();
118
119 let mut new_file = self
120 .client
121 .create(&make_absolute_file(to), write_options)
122 .await
123 .to_object_store_err()?;
124
125 while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
126 new_file.write(bytes).await.to_object_store_err()?;
127 }
128 new_file.close().await.to_object_store_err()?;
129
130 Ok(())
131 }
132
133 async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
134 let path_buf = PathBuf::from(file_path);
135
136 let file_name = path_buf
137 .file_name()
138 .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
139 .to_object_store_err()?
140 .to_str()
141 .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
142 .to_object_store_err()?
143 .to_string();
144
145 let tmp_file_path = path_buf
146 .with_file_name(format!(".{}.tmp", file_name))
147 .to_str()
148 .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
149 .to_object_store_err()?
150 .to_string();
151
152 let mut index = 1;
154 loop {
155 let path = format!("{}.{}", tmp_file_path, index);
156 match self.client.create(&path, WriteOptions::default()).await {
157 Ok(writer) => break Ok((writer, path)),
158 Err(HdfsError::AlreadyExists(_)) => index += 1,
159 Err(e) => break Err(e).to_object_store_err(),
160 }
161 }
162 }
163}
164
165impl Display for HdfsObjectStore {
166 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167 write!(f, "HdfsObjectStore")
168 }
169}
170
171impl From<Client> for HdfsObjectStore {
172 fn from(value: Client) -> Self {
173 Self::new(Arc::new(value))
174 }
175}
176
177#[async_trait]
178impl ObjectStore for HdfsObjectStore {
179 async fn put_opts(
185 &self,
186 location: &Path,
187 payload: PutPayload,
188 opts: PutOptions,
189 ) -> Result<PutResult> {
190 let overwrite = match opts.mode {
191 PutMode::Create => false,
192 PutMode::Overwrite => true,
193 PutMode::Update(_) => {
194 return Err(object_store::Error::NotSupported {
195 source: "Update mode not supported".to_string().into(),
196 })
197 }
198 };
199
200 let final_file_path = make_absolute_file(location);
201
202 if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
206 return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
207 }
208
209 let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
210
211 for bytes in payload {
212 tmp_file.write(bytes).await.to_object_store_err()?;
213 }
214 tmp_file.close().await.to_object_store_err()?;
215
216 self.client
217 .rename(&tmp_file_path, &final_file_path, overwrite)
218 .await
219 .to_object_store_err()?;
220
221 Ok(PutResult {
222 e_tag: None,
223 version: None,
224 })
225 }
226
227 async fn put_multipart_opts(
230 &self,
231 location: &Path,
232 _opts: PutMultipartOpts,
233 ) -> Result<Box<dyn MultipartUpload>> {
234 let final_file_path = make_absolute_file(location);
235
236 let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
237
238 Ok(Box::new(HdfsMultipartWriter::new(
239 Arc::clone(&self.client),
240 tmp_file,
241 &tmp_file_path,
242 &final_file_path,
243 )))
244 }
245
246 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
248 if options.if_match.is_some()
249 || options.if_none_match.is_some()
250 || options.if_modified_since.is_some()
251 || options.if_unmodified_since.is_some()
252 {
253 return Err(object_store::Error::NotImplemented);
254 }
255
256 let meta = self.head(location).await?;
257
258 let range = options
259 .range
260 .map(|r| match r {
261 GetRange::Bounded(range) => range,
262 GetRange::Offset(offset) => offset..meta.size,
263 GetRange::Suffix(suffix) => meta.size.saturating_sub(suffix)..meta.size,
264 })
265 .unwrap_or(0..meta.size);
266
267 let reader = self
268 .client
269 .read(&make_absolute_file(location))
270 .await
271 .to_object_store_err()?;
272 let start: usize = range
273 .start
274 .try_into()
275 .expect("unable to convert range.start to usize");
276 let end: usize = range
277 .end
278 .try_into()
279 .expect("unable to convert range.end to usize");
280 let stream = reader
281 .read_range_stream(start, end - start)
282 .map(|b| b.to_object_store_err())
283 .boxed();
284
285 let payload = GetResultPayload::Stream(stream);
286
287 Ok(GetResult {
288 payload,
289 meta,
290 range,
291 attributes: Default::default(),
292 })
293 }
294
295 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
297 let status = self
298 .client
299 .get_file_info(&make_absolute_file(location))
300 .await
301 .to_object_store_err()?;
302
303 if status.isdir {
304 return Err(HdfsError::IsADirectoryError(
305 "Head must be called on a file".to_string(),
306 ))
307 .to_object_store_err();
308 }
309
310 get_object_meta(&status)
311 }
312
313 async fn delete(&self, location: &Path) -> Result<()> {
315 let result = self
316 .client
317 .delete(&make_absolute_file(location), false)
318 .await
319 .to_object_store_err()?;
320
321 if !result {
322 Err(HdfsError::OperationFailed(
323 "failed to delete object".to_string(),
324 ))
325 .to_object_store_err()?
326 }
327
328 Ok(())
329 }
330
331 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
338 let status_stream = self
339 .client
340 .list_status_iter(
341 &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
342 true,
343 )
344 .into_stream()
345 .filter(|res| {
346 let result = match res {
347 Ok(status) => !status.isdir,
348 Err(HdfsError::FileNotFound(_)) => false,
350 _ => true,
351 };
352 future::ready(result)
353 })
354 .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
355
356 Box::pin(status_stream)
357 }
358
359 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
366 let mut status_stream = self
367 .client
368 .list_status_iter(
369 &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
370 false,
371 )
372 .into_stream()
373 .filter(|res| {
374 let result = match res {
375 Err(HdfsError::FileNotFound(_)) => false,
377 _ => true,
378 };
379 future::ready(result)
380 });
381
382 let mut statuses = Vec::<FileStatus>::new();
383 while let Some(status) = status_stream.next().await {
384 statuses.push(status.to_object_store_err()?);
385 }
386
387 let mut dirs: Vec<Path> = Vec::new();
388 for status in statuses.iter().filter(|s| s.isdir) {
389 dirs.push(Path::parse(&status.path)?)
390 }
391
392 let mut files: Vec<ObjectMeta> = Vec::new();
393 for status in statuses.iter().filter(|s| !s.isdir) {
394 files.push(get_object_meta(status)?)
395 }
396
397 Ok(ListResult {
398 common_prefixes: dirs,
399 objects: files,
400 })
401 }
402
403 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
405 Ok(self
406 .client
407 .rename(&make_absolute_file(from), &make_absolute_file(to), true)
408 .await
409 .to_object_store_err()?)
410 }
411
412 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
415 Ok(self
416 .client
417 .rename(&make_absolute_file(from), &make_absolute_file(to), false)
418 .await
419 .to_object_store_err()?)
420 }
421
422 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
426 self.internal_copy(from, to, true).await
427 }
428
429 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
437 self.internal_copy(from, to, false).await
438 }
439}
440
441#[cfg(feature = "integration-test")]
442pub trait HdfsErrorConvert<T> {
443 fn to_object_store_err(self) -> Result<T>;
444}
445
446#[cfg(not(feature = "integration-test"))]
447trait HdfsErrorConvert<T> {
448 fn to_object_store_err(self) -> Result<T>;
449}
450
451impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
452 fn to_object_store_err(self) -> Result<T> {
453 self.map_err(|err| match err {
454 HdfsError::FileNotFound(path) => object_store::Error::NotFound {
455 path: path.clone(),
456 source: Box::new(HdfsError::FileNotFound(path)),
457 },
458 HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
459 path: path.clone(),
460 source: Box::new(HdfsError::AlreadyExists(path)),
461 },
462 _ => object_store::Error::Generic {
463 store: "HdfsObjectStore",
464 source: Box::new(err),
465 },
466 })
467 }
468}
469
470type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
471
472struct HdfsMultipartWriter {
477 client: Arc<Client>,
478 sender: Option<(JoinHandle<Result<()>>, PartSender)>,
479 tmp_filename: String,
480 final_filename: String,
481}
482
483impl HdfsMultipartWriter {
484 fn new(
485 client: Arc<Client>,
486 writer: FileWriter,
487 tmp_filename: &str,
488 final_filename: &str,
489 ) -> Self {
490 let (sender, receiver) = mpsc::unbounded_channel();
491
492 Self {
493 client,
494 sender: Some((Self::start_writer_task(writer, receiver), sender)),
495 tmp_filename: tmp_filename.to_string(),
496 final_filename: final_filename.to_string(),
497 }
498 }
499
500 fn start_writer_task(
501 mut writer: FileWriter,
502 mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
503 ) -> JoinHandle<Result<()>> {
504 task::spawn(async move {
505 'outer: loop {
506 match part_receiver.recv().await {
507 Some((sender, part)) => {
508 for bytes in part {
509 if let Err(e) = writer.write(bytes).await.to_object_store_err() {
510 let _ = sender.send(Err(e));
511 break 'outer;
512 }
513 }
514 let _ = sender.send(Ok(()));
515 }
516 None => {
517 return writer.close().await.to_object_store_err();
518 }
519 }
520 }
521
522 while let Some((sender, _)) = part_receiver.recv().await {
524 let _ = sender.send(
525 Err(HdfsError::OperationFailed(
526 "Write failed during one of the parts".to_string(),
527 ))
528 .to_object_store_err(),
529 );
530 }
531 Err(HdfsError::OperationFailed(
532 "Write failed during one of the parts".to_string(),
533 ))
534 .to_object_store_err()
535 })
536 }
537}
538
539impl std::fmt::Debug for HdfsMultipartWriter {
540 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
541 f.debug_struct("HdfsMultipartWriter")
542 .field("tmp_filename", &self.tmp_filename)
543 .field("final_filename", &self.final_filename)
544 .finish()
545 }
546}
547
548#[async_trait]
549impl MultipartUpload for HdfsMultipartWriter {
550 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
551 let (result_sender, result_receiver) = oneshot::channel();
552
553 if let Some((_, payload_sender)) = self.sender.as_ref() {
554 payload_sender.send((result_sender, payload)).unwrap();
555 } else {
556 result_sender
557 .send(
558 Err(HdfsError::OperationFailed(
559 "Cannot put part after completing or aborting".to_string(),
560 ))
561 .to_object_store_err(),
562 )
563 .unwrap();
564 }
565
566 async { result_receiver.await.unwrap() }.boxed()
567 }
568
569 async fn complete(&mut self) -> Result<PutResult> {
570 if let Some((handle, sender)) = self.sender.take() {
572 drop(sender);
573
574 handle.await??;
576
577 self.client
578 .rename(&self.tmp_filename, &self.final_filename, true)
579 .await
580 .to_object_store_err()?;
581
582 Ok(PutResult {
583 e_tag: None,
584 version: None,
585 })
586 } else {
587 Err(object_store::Error::NotSupported {
588 source: "Cannot call abort or complete multiple times".into(),
589 })
590 }
591 }
592
593 async fn abort(&mut self) -> Result<()> {
594 if let Some((handle, sender)) = self.sender.take() {
596 drop(sender);
597
598 handle.abort();
600
601 self.client
602 .delete(&self.tmp_filename, false)
603 .await
604 .to_object_store_err()?;
605
606 Ok(())
607 } else {
608 Err(object_store::Error::NotSupported {
609 source: "Cannot call abort or complete multiple times".into(),
610 })
611 }
612 }
613}
614
615fn make_absolute_file(path: &Path) -> String {
617 format!("/{}", path.as_ref())
618}
619
620fn make_absolute_dir(path: &Path) -> String {
621 if path.parts().count() > 0 {
622 format!("/{}/", path.as_ref())
623 } else {
624 "/".to_string()
625 }
626}
627
628fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
629 Ok(ObjectMeta {
630 location: Path::parse(&status.path)?,
631 last_modified: DateTime::<Utc>::from_timestamp_millis(status.modification_time as i64)
632 .unwrap(),
633 size: status
634 .length
635 .try_into()
636 .expect("unable to convert status.length to usize"),
637 e_tag: None,
638 version: None,
639 })
640}