1use std::{
16 collections::HashMap,
17 fmt::{Display, Formatter},
18 future,
19 path::PathBuf,
20 sync::Arc,
21};
22
23use async_trait::async_trait;
24use chrono::{DateTime, Utc};
25use futures::{
26 stream::{BoxStream, StreamExt},
27 FutureExt,
28};
29use hdfs_native::{client::FileStatus, file::FileWriter, Client, HdfsError, WriteOptions};
30use object_store::{
31 path::Path, GetOptions, GetRange, GetResult, GetResultPayload, ListResult, MultipartUpload,
32 ObjectMeta, ObjectStore, PutMode, PutMultipartOpts, PutOptions, PutPayload, PutResult, Result,
33 UploadPart,
34};
35use tokio::{
36 sync::{mpsc, oneshot},
37 task::{self, JoinHandle},
38};
39
40#[cfg(feature = "integration-test")]
42pub use hdfs_native::minidfs;
43
44#[derive(Debug)]
45pub struct HdfsObjectStore {
46 client: Arc<Client>,
47}
48
49impl HdfsObjectStore {
50 pub fn new(client: Arc<Client>) -> Self {
60 Self { client }
61 }
62
63 pub fn with_url(url: &str) -> Result<Self> {
74 Ok(Self::new(Arc::new(Client::new(url).to_object_store_err()?)))
75 }
76
77 pub fn with_config(url: &str, config: HashMap<String, String>) -> Result<Self> {
94 Ok(Self::new(Arc::new(
95 Client::new_with_config(url, config).to_object_store_err()?,
96 )))
97 }
98
99 async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
100 let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await {
101 Ok(_) if overwrite => true,
102 Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to))).to_object_store_err()?,
103 Err(HdfsError::FileNotFound(_)) => false,
104 Err(e) => Err(e).to_object_store_err()?,
105 };
106
107 let write_options = WriteOptions {
108 overwrite,
109 ..Default::default()
110 };
111
112 let file = self
113 .client
114 .read(&make_absolute_file(from))
115 .await
116 .to_object_store_err()?;
117 let mut stream = file.read_range_stream(0, file.file_length()).boxed();
118
119 let mut new_file = self
120 .client
121 .create(&make_absolute_file(to), write_options)
122 .await
123 .to_object_store_err()?;
124
125 while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
126 new_file.write(bytes).await.to_object_store_err()?;
127 }
128 new_file.close().await.to_object_store_err()?;
129
130 Ok(())
131 }
132
133 async fn open_tmp_file(&self, file_path: &str) -> Result<(FileWriter, String)> {
134 let path_buf = PathBuf::from(file_path);
135
136 let file_name = path_buf
137 .file_name()
138 .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
139 .to_object_store_err()?
140 .to_str()
141 .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
142 .to_object_store_err()?
143 .to_string();
144
145 let tmp_file_path = path_buf
146 .with_file_name(format!(".{}.tmp", file_name))
147 .to_str()
148 .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
149 .to_object_store_err()?
150 .to_string();
151
152 let mut index = 1;
154 loop {
155 let path = format!("{}.{}", tmp_file_path, index);
156 match self.client.create(&path, WriteOptions::default()).await {
157 Ok(writer) => break Ok((writer, path)),
158 Err(HdfsError::AlreadyExists(_)) => index += 1,
159 Err(e) => break Err(e).to_object_store_err(),
160 }
161 }
162 }
163}
164
165impl Display for HdfsObjectStore {
166 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
167 write!(f, "HdfsObjectStore")
168 }
169}
170
171impl From<Client> for HdfsObjectStore {
172 fn from(value: Client) -> Self {
173 Self::new(Arc::new(value))
174 }
175}
176
177#[async_trait]
178impl ObjectStore for HdfsObjectStore {
179 async fn put_opts(
185 &self,
186 location: &Path,
187 payload: PutPayload,
188 opts: PutOptions,
189 ) -> Result<PutResult> {
190 let overwrite = match opts.mode {
191 PutMode::Create => false,
192 PutMode::Overwrite => true,
193 PutMode::Update(_) => {
194 return Err(object_store::Error::NotSupported {
195 source: "Update mode not supported".to_string().into(),
196 })
197 }
198 };
199
200 let final_file_path = make_absolute_file(location);
201
202 if !overwrite && self.client.get_file_info(&final_file_path).await.is_ok() {
206 return Err(HdfsError::AlreadyExists(final_file_path)).to_object_store_err();
207 }
208
209 let (mut tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
210
211 for bytes in payload {
212 tmp_file.write(bytes).await.to_object_store_err()?;
213 }
214 tmp_file.close().await.to_object_store_err()?;
215
216 self.client
217 .rename(&tmp_file_path, &final_file_path, overwrite)
218 .await
219 .to_object_store_err()?;
220
221 Ok(PutResult {
222 e_tag: None,
223 version: None,
224 })
225 }
226
227 async fn put_multipart_opts(
230 &self,
231 location: &Path,
232 _opts: PutMultipartOpts,
233 ) -> Result<Box<dyn MultipartUpload>> {
234 let final_file_path = make_absolute_file(location);
235
236 let (tmp_file, tmp_file_path) = self.open_tmp_file(&final_file_path).await?;
237
238 Ok(Box::new(HdfsMultipartWriter::new(
239 Arc::clone(&self.client),
240 tmp_file,
241 &tmp_file_path,
242 &final_file_path,
243 )))
244 }
245
246 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
248 if options.if_match.is_some()
249 || options.if_none_match.is_some()
250 || options.if_modified_since.is_some()
251 || options.if_unmodified_since.is_some()
252 {
253 return Err(object_store::Error::NotImplemented);
254 }
255
256 let meta = self.head(location).await?;
257
258 let range = options
259 .range
260 .map(|r| match r {
261 GetRange::Bounded(range) => range,
262 GetRange::Offset(offset) => offset..meta.size,
263 GetRange::Suffix(suffix) => meta.size.saturating_sub(suffix)..meta.size,
264 })
265 .unwrap_or(0..meta.size);
266
267 let reader = self
268 .client
269 .read(&make_absolute_file(location))
270 .await
271 .to_object_store_err()?;
272 let stream = reader
273 .read_range_stream(range.start, range.end - range.start)
274 .map(|b| b.to_object_store_err())
275 .boxed();
276
277 let payload = GetResultPayload::Stream(stream);
278
279 Ok(GetResult {
280 payload,
281 meta,
282 range,
283 attributes: Default::default(),
284 })
285 }
286
287 async fn head(&self, location: &Path) -> Result<ObjectMeta> {
289 let status = self
290 .client
291 .get_file_info(&make_absolute_file(location))
292 .await
293 .to_object_store_err()?;
294
295 if status.isdir {
296 return Err(HdfsError::IsADirectoryError(
297 "Head must be called on a file".to_string(),
298 ))
299 .to_object_store_err();
300 }
301
302 get_object_meta(&status)
303 }
304
305 async fn delete(&self, location: &Path) -> Result<()> {
307 let result = self
308 .client
309 .delete(&make_absolute_file(location), false)
310 .await
311 .to_object_store_err()?;
312
313 if !result {
314 Err(HdfsError::OperationFailed(
315 "failed to delete object".to_string(),
316 ))
317 .to_object_store_err()?
318 }
319
320 Ok(())
321 }
322
323 fn list(&self, prefix: Option<&Path>) -> BoxStream<'_, Result<ObjectMeta>> {
330 let status_stream = self
331 .client
332 .list_status_iter(
333 &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
334 true,
335 )
336 .into_stream()
337 .filter(|res| {
338 let result = match res {
339 Ok(status) => !status.isdir,
340 Err(HdfsError::FileNotFound(_)) => false,
342 _ => true,
343 };
344 future::ready(result)
345 })
346 .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
347
348 Box::pin(status_stream)
349 }
350
351 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
358 let mut status_stream = self
359 .client
360 .list_status_iter(
361 &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
362 false,
363 )
364 .into_stream()
365 .filter(|res| {
366 let result = match res {
367 Err(HdfsError::FileNotFound(_)) => false,
369 _ => true,
370 };
371 future::ready(result)
372 });
373
374 let mut statuses = Vec::<FileStatus>::new();
375 while let Some(status) = status_stream.next().await {
376 statuses.push(status.to_object_store_err()?);
377 }
378
379 let mut dirs: Vec<Path> = Vec::new();
380 for status in statuses.iter().filter(|s| s.isdir) {
381 dirs.push(Path::parse(&status.path)?)
382 }
383
384 let mut files: Vec<ObjectMeta> = Vec::new();
385 for status in statuses.iter().filter(|s| !s.isdir) {
386 files.push(get_object_meta(status)?)
387 }
388
389 Ok(ListResult {
390 common_prefixes: dirs,
391 objects: files,
392 })
393 }
394
395 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
397 Ok(self
398 .client
399 .rename(&make_absolute_file(from), &make_absolute_file(to), true)
400 .await
401 .to_object_store_err()?)
402 }
403
404 async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
407 Ok(self
408 .client
409 .rename(&make_absolute_file(from), &make_absolute_file(to), false)
410 .await
411 .to_object_store_err()?)
412 }
413
414 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
418 self.internal_copy(from, to, true).await
419 }
420
421 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
429 self.internal_copy(from, to, false).await
430 }
431}
432
433#[cfg(feature = "integration-test")]
434pub trait HdfsErrorConvert<T> {
435 fn to_object_store_err(self) -> Result<T>;
436}
437
438#[cfg(not(feature = "integration-test"))]
439trait HdfsErrorConvert<T> {
440 fn to_object_store_err(self) -> Result<T>;
441}
442
443impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
444 fn to_object_store_err(self) -> Result<T> {
445 self.map_err(|err| match err {
446 HdfsError::FileNotFound(path) => object_store::Error::NotFound {
447 path: path.clone(),
448 source: Box::new(HdfsError::FileNotFound(path)),
449 },
450 HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
451 path: path.clone(),
452 source: Box::new(HdfsError::AlreadyExists(path)),
453 },
454 _ => object_store::Error::Generic {
455 store: "HdfsObjectStore",
456 source: Box::new(err),
457 },
458 })
459 }
460}
461
462type PartSender = mpsc::UnboundedSender<(oneshot::Sender<Result<()>>, PutPayload)>;
463
464struct HdfsMultipartWriter {
469 client: Arc<Client>,
470 sender: Option<(JoinHandle<Result<()>>, PartSender)>,
471 tmp_filename: String,
472 final_filename: String,
473}
474
475impl HdfsMultipartWriter {
476 fn new(
477 client: Arc<Client>,
478 writer: FileWriter,
479 tmp_filename: &str,
480 final_filename: &str,
481 ) -> Self {
482 let (sender, receiver) = mpsc::unbounded_channel();
483
484 Self {
485 client,
486 sender: Some((Self::start_writer_task(writer, receiver), sender)),
487 tmp_filename: tmp_filename.to_string(),
488 final_filename: final_filename.to_string(),
489 }
490 }
491
492 fn start_writer_task(
493 mut writer: FileWriter,
494 mut part_receiver: mpsc::UnboundedReceiver<(oneshot::Sender<Result<()>>, PutPayload)>,
495 ) -> JoinHandle<Result<()>> {
496 task::spawn(async move {
497 'outer: loop {
498 match part_receiver.recv().await {
499 Some((sender, part)) => {
500 for bytes in part {
501 if let Err(e) = writer.write(bytes).await.to_object_store_err() {
502 let _ = sender.send(Err(e));
503 break 'outer;
504 }
505 }
506 let _ = sender.send(Ok(()));
507 }
508 None => {
509 return writer.close().await.to_object_store_err();
510 }
511 }
512 }
513
514 while let Some((sender, _)) = part_receiver.recv().await {
516 let _ = sender.send(
517 Err(HdfsError::OperationFailed(
518 "Write failed during one of the parts".to_string(),
519 ))
520 .to_object_store_err(),
521 );
522 }
523 Err(HdfsError::OperationFailed(
524 "Write failed during one of the parts".to_string(),
525 ))
526 .to_object_store_err()
527 })
528 }
529}
530
531impl std::fmt::Debug for HdfsMultipartWriter {
532 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
533 f.debug_struct("HdfsMultipartWriter")
534 .field("tmp_filename", &self.tmp_filename)
535 .field("final_filename", &self.final_filename)
536 .finish()
537 }
538}
539
540#[async_trait]
541impl MultipartUpload for HdfsMultipartWriter {
542 fn put_part(&mut self, payload: PutPayload) -> UploadPart {
543 let (result_sender, result_receiver) = oneshot::channel();
544
545 if let Some((_, payload_sender)) = self.sender.as_ref() {
546 payload_sender.send((result_sender, payload)).unwrap();
547 } else {
548 result_sender
549 .send(
550 Err(HdfsError::OperationFailed(
551 "Cannot put part after completing or aborting".to_string(),
552 ))
553 .to_object_store_err(),
554 )
555 .unwrap();
556 }
557
558 async { result_receiver.await.unwrap() }.boxed()
559 }
560
561 async fn complete(&mut self) -> Result<PutResult> {
562 if let Some((handle, sender)) = self.sender.take() {
564 drop(sender);
565
566 handle.await??;
568
569 self.client
570 .rename(&self.tmp_filename, &self.final_filename, true)
571 .await
572 .to_object_store_err()?;
573
574 Ok(PutResult {
575 e_tag: None,
576 version: None,
577 })
578 } else {
579 Err(object_store::Error::NotSupported {
580 source: "Cannot call abort or complete multiple times".into(),
581 })
582 }
583 }
584
585 async fn abort(&mut self) -> Result<()> {
586 if let Some((handle, sender)) = self.sender.take() {
588 drop(sender);
589
590 handle.abort();
592
593 self.client
594 .delete(&self.tmp_filename, false)
595 .await
596 .to_object_store_err()?;
597
598 Ok(())
599 } else {
600 Err(object_store::Error::NotSupported {
601 source: "Cannot call abort or complete multiple times".into(),
602 })
603 }
604 }
605}
606
607fn make_absolute_file(path: &Path) -> String {
609 format!("/{}", path.as_ref())
610}
611
612fn make_absolute_dir(path: &Path) -> String {
613 if path.parts().count() > 0 {
614 format!("/{}/", path.as_ref())
615 } else {
616 "/".to_string()
617 }
618}
619
620fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
621 Ok(ObjectMeta {
622 location: Path::parse(&status.path)?,
623 last_modified: DateTime::<Utc>::from_timestamp_millis(status.modification_time as i64)
624 .unwrap(),
625 size: status.length,
626 e_tag: None,
627 version: None,
628 })
629}