1use std::{
16 collections::HashMap,
17 fmt::{Display, Formatter},
18 future,
19 path::PathBuf,
20 sync::Arc,
21};
22
23use async_trait::async_trait;
24use chrono::{DateTime, Utc};
25use futures::{
26 stream::{BoxStream, StreamExt},
27 FutureExt,
28};
29use hdfs_native::{client::FileStatus, file::FileWriter, Client, HdfsError, WriteOptions};
30use object_store::{
31 path::Path, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
32 ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
33 UploadPart,
34};
35use tokio::{
36 sync::{mpsc, oneshot},
37 task::{self, JoinHandle},
38};
39
40#[cfg(feature = "integration-test")]
42pub use hdfs_native::minidfs;
43
44#[derive(Debug)]
45pub struct HdfsObjectStore {
46 client: Arc<Client>,
47}
48
49impl HdfsObjectStore {
50 pub fn new(client: Arc<Client>) -> Self {
60 Self { client }
61 }
62
63 pub fn with_url(url: &str) -> Result<Self> {
74 Ok(Self::new(Arc::new(Client::new(url).to_object_store_err()?)))
75 }
76
77 pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
94 Ok(Self::new(Arc::new(
95 Client::new_with_config(url, config).to_object_store_err()?,
96 )))
97 }
98
99 async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
100 let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await {
101 Ok(_) if overwrite => true,
102 Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to))).to_object_store_err()?,
103 Err(HdfsError::FileNotFound(_)) => false,
104 Err(e) => Err(e).to_object_store_err()?,
105 };
106
107 let write_options = WriteOptions {
108 overwrite,
109 ..Default::default()
110 };
111
112 let file = self
113 .client
114 .read(&make_absolute_file(from))
115 .await
116 .to_object_store_err()?;
117 let mut stream = file.read_range_stream(0, file.file_length()).boxed();
118
119 let mut new_file = self
120 .client
121 .create(&make_absolute_file(to), write_options)
122 .await
123 .to_object_store_err()?;
124
125 while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
126 new_file.write(bytes).await.to_object_store_err()?;
127 }
128 new_file.close().await.to_object_store_err()?;
129
130 Ok(())
131 }
132
133 async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
134 let path_buf = PathBuf::from(file_path);
135
136 let file_name = path_buf
137 .file_name()
138 .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
139 .to_object_store_err()?
140 .to_str()
141 .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
142 .to_object_store_err()?
143 .to_string();
144
145 let tmp_file_path = path_buf
146 .with_file_name(format!(".{}.tmp", file_name))
147 .to_str()
148 .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
149 .to_object_store_err()?
150 .to_string();
151
152 let mut index = 1;
154 loop {
155 let path = format!("{}.{}", tmp_file_path, index);
156 match self.client.create(&path, WriteOptions::default()).await {
157 Ok(writer) => break Ok((writer, path)),
158 Err(HdfsError::AlreadyExists(_)) => index += 1,
159 Err(e) => break Err(e).to_object_store_err(),
160 }
161 }
162 }
163}
164
165impl Display for HdfsObjectStore {
166 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167 write!(f, "HdfsObjectStore")
168 }
169}
170
171impl From<Client> for HdfsObjectStore {
172 fn from(value: Client) -> Self {
173 Self::new(Arc::new(value))
174 }
175}
176
177#[async_trait]
178impl ObjectStore for HdfsObjectStore {
179 async fn put_opts(
185 &self,
186 location: &Path,
187 payload: PutPayload,
188 opts: PutOptions,
189 ) -> Result<PutResult> {
190 let overwrite = match opts.mode {
191 PutMode::Create => false,
192 PutMode::Overwrite => true,
193 PutMode::Update(_) => {
194 return Err(object_store::Error::NotSupported {
195 source: "Update mode not supported".to_string().into(),
196 })
197 }
198 };
199
200 let final_file_path = make_absolute_file(location);
201
202 if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
206 return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
207 }
208
209 let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
210
211 for bytes in payload {
212 tmp_file.write(bytes).await.to_object_store_err()?;
213 }
214 tmp_file.close().await.to_object_store_err()?;
215
216 self.client
217 .rename(&tmp_file_path, &final_file_path, overwrite)
218 .await
219 .to_object_store_err()?;
220
221 Ok(PutResult {
222 e_tag: None,
223 version: None,
224 })
225 }
226
227 async fn put_multipart_opts(
230 &self,
231 location: &Path,
232 _opts: PutMultipartOpts,
233 ) -> Result<Box<dyn MultipartUpload>> {
234 let final_file_path = make_absolute_file(location);
235
236 let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
237
238 Ok(Box::new(HdfsMultipartWriter::new(
239 Arc::clone(&self.client),
240 tmp_file,
241 &tmp_file_path,
242 &final_file_path,
243 )))
244 }
245
246 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
248 if options.if_match.is_some()
249 || options.if_none_match.is_some()
250 || options.if_modified_since.is_some()
251 || options.if_unmodified_since.is_some()
252 {
253 return Err(object_store::Error::NotImplemented);
254 }
255
256 let meta = self.head(location).await?;
257
258 let range = options
259 .range
260 .map(|r| match r {
261 GetRange::Bounded(range) => range,
262 GetRange::Offset(offset) => offset..meta.size,
263 GetRange::Suffix(suffix) => meta.size.saturating_sub(suffix)..meta.size,
264 })
265 .unwrap_or(0..meta.size);
266
267 let reader = self
268 .client
269 .read(&make_absolute_file(location))
270 .await
271 .to_object_store_err()?;
272 let stream = reader
273 .read_range_stream(range.start, range.end - range.start)
274 .map(|b| b.to_object_store_err())
275 .boxed();
276
277 let payload = GetResultPayload::Stream(stream);
278
279 Ok(GetResult {
280 payload,
281 meta,
282 range,
283 attributes: Default::default(),
284 })
285 }
286
287 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
289 let status = self
290 .client
291 .get_file_info(&make_absolute_file(location))
292 .await
293 .to_object_store_err()?;
294
295 if status.isdir {
296 return Err(HdfsError::IsADirectoryError(
297 "Head must be called on a file".to_string(),
298 ))
299 .to_object_store_err();
300 }
301
302 Ok(ObjectMeta {
303 location: location.clone(),
304 last_modified: DateTime::<Utc>::from_timestamp(status.modification_time as i64, 0)
305 .unwrap(),
306 size: status.length,
307 e_tag: None,
308 version: None,
309 })
310 }
311
312 async fn delete(&self, location: &Path) -> Result<()> {
314 let result = self
315 .client
316 .delete(&make_absolute_file(location), false)
317 .await
318 .to_object_store_err()?;
319
320 if !result {
321 Err(HdfsError::OperationFailed(
322 "failed to delete object".to_string(),
323 ))
324 .to_object_store_err()?
325 }
326
327 Ok(())
328 }
329
330 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
337 let status_stream = self
338 .client
339 .list_status_iter(
340 &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
341 true,
342 )
343 .into_stream()
344 .filter(|res| {
345 let result = match res {
346 Ok(status) => !status.isdir,
347 Err(HdfsError::FileNotFound(_)) => false,
349 _ => true,
350 };
351 future::ready(result)
352 })
353 .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
354
355 Box::pin(status_stream)
356 }
357
358 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
365 let mut status_stream = self
366 .client
367 .list_status_iter(
368 &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
369 false,
370 )
371 .into_stream()
372 .filter(|res| {
373 let result = match res {
374 Err(HdfsError::FileNotFound(_)) => false,
376 _ => true,
377 };
378 future::ready(result)
379 });
380
381 let mut statuses = Vec::<FileStatus>::new();
382 while let Some(status) = status_stream.next().await {
383 statuses.push(status.to_object_store_err()?);
384 }
385
386 let mut dirs: Vec<Path> = Vec::new();
387 for status in statuses.iter().filter(|s| s.isdir) {
388 dirs.push(Path::parse(&status.path)?)
389 }
390
391 let mut files: Vec<ObjectMeta> = Vec::new();
392 for status in statuses.iter().filter(|s| !s.isdir) {
393 files.push(get_object_meta(status)?)
394 }
395
396 Ok(ListResult {
397 common_prefixes: dirs,
398 objects: files,
399 })
400 }
401
402 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
404 Ok(self
405 .client
406 .rename(&make_absolute_file(from), &make_absolute_file(to), true)
407 .await
408 .to_object_store_err()?)
409 }
410
411 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
414 Ok(self
415 .client
416 .rename(&make_absolute_file(from), &make_absolute_file(to), false)
417 .await
418 .to_object_store_err()?)
419 }
420
421 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
425 self.internal_copy(from, to, true).await
426 }
427
428 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
436 self.internal_copy(from, to, false).await
437 }
438}
439
440#[cfg(feature = "integration-test")]
441pub trait HdfsErrorConvert<T> {
442 fn to_object_store_err(self) -> Result<T>;
443}
444
445#[cfg(not(feature = "integration-test"))]
446trait HdfsErrorConvert<T> {
447 fn to_object_store_err(self) -> Result<T>;
448}
449
450impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
451 fn to_object_store_err(self) -> Result<T> {
452 self.map_err(|err| match err {
453 HdfsError::FileNotFound(path) => object_store::Error::NotFound {
454 path: path.clone(),
455 source: Box::new(HdfsError::FileNotFound(path)),
456 },
457 HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
458 path: path.clone(),
459 source: Box::new(HdfsError::AlreadyExists(path)),
460 },
461 _ => object_store::Error::Generic {
462 store: "HdfsObjectStore",
463 source: Box::new(err),
464 },
465 })
466 }
467}
468
469type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
470
471struct HdfsMultipartWriter {
476 client: Arc<Client>,
477 sender: Option<(JoinHandle<Result<()>>, PartSender)>,
478 tmp_filename: String,
479 final_filename: String,
480}
481
482impl HdfsMultipartWriter {
483 fn new(
484 client: Arc<Client>,
485 writer: FileWriter,
486 tmp_filename: &str,
487 final_filename: &str,
488 ) -> Self {
489 let (sender, receiver) = mpsc::unbounded_channel();
490
491 Self {
492 client,
493 sender: Some((Self::start_writer_task(writer, receiver), sender)),
494 tmp_filename: tmp_filename.to_string(),
495 final_filename: final_filename.to_string(),
496 }
497 }
498
499 fn start_writer_task(
500 mut writer: FileWriter,
501 mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
502 ) -> JoinHandle<Result<()>> {
503 task::spawn(async move {
504 'outer: loop {
505 match part_receiver.recv().await {
506 Some((sender, part)) => {
507 for bytes in part {
508 if let Err(e) = writer.write(bytes).await.to_object_store_err() {
509 let _ = sender.send(Err(e));
510 break 'outer;
511 }
512 }
513 let _ = sender.send(Ok(()));
514 }
515 None => {
516 return writer.close().await.to_object_store_err();
517 }
518 }
519 }
520
521 while let Some((sender, _)) = part_receiver.recv().await {
523 let _ = sender.send(
524 Err(HdfsError::OperationFailed(
525 "Write failed during one of the parts".to_string(),
526 ))
527 .to_object_store_err(),
528 );
529 }
530 Err(HdfsError::OperationFailed(
531 "Write failed during one of the parts".to_string(),
532 ))
533 .to_object_store_err()
534 })
535 }
536}
537
538impl std::fmt::Debug for HdfsMultipartWriter {
539 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
540 f.debug_struct("HdfsMultipartWriter")
541 .field("tmp_filename", &self.tmp_filename)
542 .field("final_filename", &self.final_filename)
543 .finish()
544 }
545}
546
547#[async_trait]
548impl MultipartUpload for HdfsMultipartWriter {
549 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
550 let (result_sender, result_receiver) = oneshot::channel();
551
552 if let Some((_, payload_sender)) = self.sender.as_ref() {
553 payload_sender.send((result_sender, payload)).unwrap();
554 } else {
555 result_sender
556 .send(
557 Err(HdfsError::OperationFailed(
558 "Cannot put part after completing or aborting".to_string(),
559 ))
560 .to_object_store_err(),
561 )
562 .unwrap();
563 }
564
565 async { result_receiver.await.unwrap() }.boxed()
566 }
567
568 async fn complete(&mut self) -> Result<PutResult> {
569 if let Some((handle, sender)) = self.sender.take() {
571 drop(sender);
572
573 handle.await??;
575
576 self.client
577 .rename(&self.tmp_filename, &self.final_filename, true)
578 .await
579 .to_object_store_err()?;
580
581 Ok(PutResult {
582 e_tag: None,
583 version: None,
584 })
585 } else {
586 Err(object_store::Error::NotSupported {
587 source: "Cannot call abort or complete multiple times".into(),
588 })
589 }
590 }
591
592 async fn abort(&mut self) -> Result<()> {
593 if let Some((handle, sender)) = self.sender.take() {
595 drop(sender);
596
597 handle.abort();
599
600 self.client
601 .delete(&self.tmp_filename, false)
602 .await
603 .to_object_store_err()?;
604
605 Ok(())
606 } else {
607 Err(object_store::Error::NotSupported {
608 source: "Cannot call abort or complete multiple times".into(),
609 })
610 }
611 }
612}
613
614fn make_absolute_file(path: &Path) -> String {
616 format!("/{}", path.as_ref())
617}
618
619fn make_absolute_dir(path: &Path) -> String {
620 if path.parts().count() > 0 {
621 format!("/{}/", path.as_ref())
622 } else {
623 "/".to_string()
624 }
625}
626
627fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
628 Ok(ObjectMeta {
629 location: Path::parse(&status.path)?,
630 last_modified: DateTime::<Utc>::from_timestamp(status.modification_time as i64, 0).unwrap(),
631 size: status.length,
632 e_tag: None,
633 version: None,
634 })
635}