1use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
20use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
21use std::ops::Range;
22use std::sync::Arc;
23use std::time::SystemTime;
24use std::{collections::BTreeSet, io};
25use std::{collections::VecDeque, path::PathBuf};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use chrono::{DateTime, Utc};
30use futures::{stream::BoxStream, StreamExt};
31use futures::{FutureExt, TryStreamExt};
32use parking_lot::Mutex;
33use url::Url;
34use walkdir::{DirEntry, WalkDir};
35
36use crate::{
37 maybe_spawn_blocking,
38 path::{absolute_path_to_url, Path},
39 util::InvalidGetRange,
40 Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
41 ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
42 UploadPart,
43};
44
45#[derive(Debug, thiserror::Error)]
47pub(crate) enum Error {
48 #[error("Unable to walk dir: {}", source)]
49 UnableToWalkDir { source: walkdir::Error },
50
51 #[error("Unable to access metadata for {}: {}", path, source)]
52 Metadata {
53 source: Box<dyn std::error::Error + Send + Sync + 'static>,
54 path: String,
55 },
56
57 #[error("Unable to copy data to file: {}", source)]
58 UnableToCopyDataToFile { source: io::Error },
59
60 #[error("Unable to rename file: {}", source)]
61 UnableToRenameFile { source: io::Error },
62
63 #[error("Unable to create dir {}: {}", path.display(), source)]
64 UnableToCreateDir { source: io::Error, path: PathBuf },
65
66 #[error("Unable to create file {}: {}", path.display(), source)]
67 UnableToCreateFile { source: io::Error, path: PathBuf },
68
69 #[error("Unable to delete file {}: {}", path.display(), source)]
70 UnableToDeleteFile { source: io::Error, path: PathBuf },
71
72 #[error("Unable to open file {}: {}", path.display(), source)]
73 UnableToOpenFile { source: io::Error, path: PathBuf },
74
75 #[error("Unable to read data from file {}: {}", path.display(), source)]
76 UnableToReadBytes { source: io::Error, path: PathBuf },
77
78 #[error("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual)]
79 OutOfRange {
80 path: PathBuf,
81 expected: u64,
82 actual: u64,
83 },
84
85 #[error("Requested range was invalid")]
86 InvalidRange { source: InvalidGetRange },
87
88 #[error("Unable to copy file from {} to {}: {}", from.display(), to.display(), source)]
89 UnableToCopyFile {
90 from: PathBuf,
91 to: PathBuf,
92 source: io::Error,
93 },
94
95 #[error("NotFound")]
96 NotFound { path: PathBuf, source: io::Error },
97
98 #[error("Error seeking file {}: {}", path.display(), source)]
99 Seek { source: io::Error, path: PathBuf },
100
101 #[error("Unable to convert URL \"{}\" to filesystem path", url)]
102 InvalidUrl { url: Url },
103
104 #[error("AlreadyExists")]
105 AlreadyExists { path: String, source: io::Error },
106
107 #[error("Unable to canonicalize filesystem root: {}", path.display())]
108 UnableToCanonicalize { path: PathBuf, source: io::Error },
109
110 #[error("Filenames containing trailing '/#\\d+/' are not supported: {}", path)]
111 InvalidPath { path: String },
112
113 #[error("Upload aborted")]
114 Aborted,
115}
116
117impl From<Error> for super::Error {
118 fn from(source: Error) -> Self {
119 match source {
120 Error::NotFound { path, source } => Self::NotFound {
121 path: path.to_string_lossy().to_string(),
122 source: source.into(),
123 },
124 Error::AlreadyExists { path, source } => Self::AlreadyExists {
125 path,
126 source: source.into(),
127 },
128 _ => Self::Generic {
129 store: "LocalFileSystem",
130 source: Box::new(source),
131 },
132 }
133 }
134}
135
136#[derive(Debug)]
197pub struct LocalFileSystem {
198 config: Arc<Config>,
199 automatic_cleanup: bool,
201}
202
203#[derive(Debug)]
204struct Config {
205 root: Url,
206}
207
208impl std::fmt::Display for LocalFileSystem {
209 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210 write!(f, "LocalFileSystem({})", self.config.root)
211 }
212}
213
214impl Default for LocalFileSystem {
215 fn default() -> Self {
216 Self::new()
217 }
218}
219
220impl LocalFileSystem {
221 pub fn new() -> Self {
223 Self {
224 config: Arc::new(Config {
225 root: Url::parse("file:///").unwrap(),
226 }),
227 automatic_cleanup: false,
228 }
229 }
230
231 pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Self> {
236 let path = std::fs::canonicalize(&prefix).map_err(|source| {
237 let path = prefix.as_ref().into();
238 Error::UnableToCanonicalize { source, path }
239 })?;
240
241 Ok(Self {
242 config: Arc::new(Config {
243 root: absolute_path_to_url(path)?,
244 }),
245 automatic_cleanup: false,
246 })
247 }
248
249 pub fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
251 if !is_valid_file_path(location) {
252 let path = location.as_ref().into();
253 let error = Error::InvalidPath { path };
254 return Err(error.into());
255 }
256
257 let path = self.config.prefix_to_filesystem(location)?;
258
259 #[cfg(target_os = "windows")]
260 let path = {
261 let path = path.to_string_lossy();
262
263 let mut out = String::new();
265 let drive = &path[..2]; let filepath = &path[2..].replace(':', "%3A"); out.push_str(drive);
268 out.push_str(filepath);
269 PathBuf::from(out)
270 };
271
272 Ok(path)
273 }
274
275 pub fn with_automatic_cleanup(mut self, automatic_cleanup: bool) -> Self {
277 self.automatic_cleanup = automatic_cleanup;
278 self
279 }
280}
281
282impl Config {
283 fn prefix_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
285 let mut url = self.root.clone();
286 url.path_segments_mut()
287 .expect("url path")
288 .pop_if_empty()
291 .extend(location.parts());
292
293 url.to_file_path()
294 .map_err(|_| Error::InvalidUrl { url }.into())
295 }
296
297 fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
299 Ok(Path::from_absolute_path_with_base(
300 location,
301 Some(&self.root),
302 )?)
303 }
304}
305
306fn is_valid_file_path(path: &Path) -> bool {
307 match path.filename() {
308 Some(p) => match p.split_once('#') {
309 Some((_, suffix)) if !suffix.is_empty() => {
310 !suffix.as_bytes().iter().all(|x| x.is_ascii_digit())
312 }
313 _ => true,
314 },
315 None => false,
316 }
317}
318
319#[async_trait]
320impl ObjectStore for LocalFileSystem {
321 async fn put_opts(
322 &self,
323 location: &Path,
324 payload: PutPayload,
325 opts: PutOptions,
326 ) -> Result<PutResult> {
327 if matches!(opts.mode, PutMode::Update(_)) {
328 return Err(crate::Error::NotImplemented);
329 }
330
331 if !opts.attributes.is_empty() {
332 return Err(crate::Error::NotImplemented);
333 }
334
335 let path = self.path_to_filesystem(location)?;
336 maybe_spawn_blocking(move || {
337 let (mut file, staging_path) = new_staged_upload(&path)?;
338 let mut e_tag = None;
339
340 let err = match payload.iter().try_for_each(|x| file.write_all(x)) {
341 Ok(_) => {
342 let metadata = file.metadata().map_err(|e| Error::Metadata {
343 source: e.into(),
344 path: path.to_string_lossy().to_string(),
345 })?;
346 e_tag = Some(get_etag(&metadata));
347 match opts.mode {
348 PutMode::Overwrite => {
349 std::mem::drop(file);
352 match std::fs::rename(&staging_path, &path) {
353 Ok(_) => None,
354 Err(source) => Some(Error::UnableToRenameFile { source }),
355 }
356 }
357 PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
358 Ok(_) => {
359 let _ = std::fs::remove_file(&staging_path); None
361 }
362 Err(source) => match source.kind() {
363 ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
364 path: path.to_str().unwrap().to_string(),
365 source,
366 }),
367 _ => Some(Error::UnableToRenameFile { source }),
368 },
369 },
370 PutMode::Update(_) => unreachable!(),
371 }
372 }
373 Err(source) => Some(Error::UnableToCopyDataToFile { source }),
374 };
375
376 if let Some(err) = err {
377 let _ = std::fs::remove_file(&staging_path); return Err(err.into());
379 }
380
381 Ok(PutResult {
382 e_tag,
383 version: None,
384 })
385 })
386 .await
387 }
388
389 async fn put_multipart_opts(
390 &self,
391 location: &Path,
392 opts: PutMultipartOptions,
393 ) -> Result<Box<dyn MultipartUpload>> {
394 if !opts.attributes.is_empty() {
395 return Err(crate::Error::NotImplemented);
396 }
397
398 let dest = self.path_to_filesystem(location)?;
399 let (file, src) = new_staged_upload(&dest)?;
400 Ok(Box::new(LocalUpload::new(src, dest, file)))
401 }
402
403 async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
404 let location = location.clone();
405 let path = self.path_to_filesystem(&location)?;
406 maybe_spawn_blocking(move || {
407 let (file, metadata) = open_file(&path)?;
408 let meta = convert_metadata(metadata, location);
409 options.check_preconditions(&meta)?;
410
411 let range = match options.range {
412 Some(r) => r
413 .as_range(meta.size)
414 .map_err(|source| Error::InvalidRange { source })?,
415 None => 0..meta.size,
416 };
417
418 Ok(GetResult {
419 payload: GetResultPayload::File(file, path),
420 attributes: Attributes::default(),
421 range,
422 meta,
423 })
424 })
425 .await
426 }
427
428 async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
429 let path = self.path_to_filesystem(location)?;
430 maybe_spawn_blocking(move || {
431 let (mut file, metadata) = open_file(&path)?;
432 read_range(&mut file, metadata.len(), &path, range)
433 })
434 .await
435 }
436
437 async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
438 let path = self.path_to_filesystem(location)?;
439 let ranges = ranges.to_vec();
440 maybe_spawn_blocking(move || {
441 let (mut file, metadata) = open_file(&path)?;
443 ranges
444 .into_iter()
445 .map(|r| read_range(&mut file, metadata.len(), &path, r))
446 .collect()
447 })
448 .await
449 }
450
451 async fn delete(&self, location: &Path) -> Result<()> {
452 let config = Arc::clone(&self.config);
453 let path = self.path_to_filesystem(location)?;
454 let automactic_cleanup = self.automatic_cleanup;
455 maybe_spawn_blocking(move || {
456 if let Err(e) = std::fs::remove_file(&path) {
457 Err(match e.kind() {
458 ErrorKind::NotFound => Error::NotFound { path, source: e }.into(),
459 _ => Error::UnableToDeleteFile { path, source: e }.into(),
460 })
461 } else if automactic_cleanup {
462 let root = &config.root;
463 let root = root
464 .to_file_path()
465 .map_err(|_| Error::InvalidUrl { url: root.clone() })?;
466
467 let mut parent = path.parent();
469
470 while let Some(loc) = parent {
471 if loc != root && std::fs::remove_dir(loc).is_ok() {
472 parent = loc.parent();
473 } else {
474 break;
475 }
476 }
477
478 Ok(())
479 } else {
480 Ok(())
481 }
482 })
483 .await
484 }
485
486 fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
487 self.list_with_maybe_offset(prefix, None)
488 }
489
490 fn list_with_offset(
491 &self,
492 prefix: Option<&Path>,
493 offset: &Path,
494 ) -> BoxStream<'static, Result<ObjectMeta>> {
495 self.list_with_maybe_offset(prefix, Some(offset))
496 }
497
498 async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
499 let config = Arc::clone(&self.config);
500
501 let prefix = prefix.cloned().unwrap_or_default();
502 let resolved_prefix = config.prefix_to_filesystem(&prefix)?;
503
504 maybe_spawn_blocking(move || {
505 let walkdir = WalkDir::new(&resolved_prefix)
506 .min_depth(1)
507 .max_depth(1)
508 .follow_links(true);
509
510 let mut common_prefixes = BTreeSet::new();
511 let mut objects = Vec::new();
512
513 for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
514 if let Some(entry) = entry_res? {
515 let is_directory = entry.file_type().is_dir();
516 let entry_location = config.filesystem_to_path(entry.path())?;
517 if !is_directory && !is_valid_file_path(&entry_location) {
518 continue;
519 }
520
521 let mut parts = match entry_location.prefix_match(&prefix) {
522 Some(parts) => parts,
523 None => continue,
524 };
525
526 let common_prefix = match parts.next() {
527 Some(p) => p,
528 None => continue,
529 };
530
531 drop(parts);
532
533 if is_directory {
534 common_prefixes.insert(prefix.child(common_prefix));
535 } else if let Some(metadata) = convert_entry(entry, entry_location)? {
536 objects.push(metadata);
537 }
538 }
539 }
540
541 Ok(ListResult {
542 common_prefixes: common_prefixes.into_iter().collect(),
543 objects,
544 })
545 })
546 .await
547 }
548
549 async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
550 let from = self.path_to_filesystem(from)?;
551 let to = self.path_to_filesystem(to)?;
552 let mut id = 0;
553 maybe_spawn_blocking(move || loop {
560 let staged = staged_upload_path(&to, &id.to_string());
561 match std::fs::hard_link(&from, &staged) {
562 Ok(_) => {
563 return std::fs::rename(&staged, &to).map_err(|source| {
564 let _ = std::fs::remove_file(&staged); Error::UnableToCopyFile { from, to, source }.into()
566 });
567 }
568 Err(source) => match source.kind() {
569 ErrorKind::AlreadyExists => id += 1,
570 ErrorKind::NotFound => match from.exists() {
571 true => create_parent_dirs(&to, source)?,
572 false => return Err(Error::NotFound { path: from, source }.into()),
573 },
574 _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
575 },
576 }
577 })
578 .await
579 }
580
581 async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
582 let from = self.path_to_filesystem(from)?;
583 let to = self.path_to_filesystem(to)?;
584 maybe_spawn_blocking(move || loop {
585 match std::fs::rename(&from, &to) {
586 Ok(_) => return Ok(()),
587 Err(source) => match source.kind() {
588 ErrorKind::NotFound => match from.exists() {
589 true => create_parent_dirs(&to, source)?,
590 false => return Err(Error::NotFound { path: from, source }.into()),
591 },
592 _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
593 },
594 }
595 })
596 .await
597 }
598
599 async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
600 let from = self.path_to_filesystem(from)?;
601 let to = self.path_to_filesystem(to)?;
602
603 maybe_spawn_blocking(move || loop {
604 match std::fs::hard_link(&from, &to) {
605 Ok(_) => return Ok(()),
606 Err(source) => match source.kind() {
607 ErrorKind::AlreadyExists => {
608 return Err(Error::AlreadyExists {
609 path: to.to_str().unwrap().to_string(),
610 source,
611 }
612 .into())
613 }
614 ErrorKind::NotFound => match from.exists() {
615 true => create_parent_dirs(&to, source)?,
616 false => return Err(Error::NotFound { path: from, source }.into()),
617 },
618 _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
619 },
620 }
621 })
622 .await
623 }
624}
625
626impl LocalFileSystem {
627 fn list_with_maybe_offset(
628 &self,
629 prefix: Option<&Path>,
630 maybe_offset: Option<&Path>,
631 ) -> BoxStream<'static, Result<ObjectMeta>> {
632 let config = Arc::clone(&self.config);
633
634 let root_path = match prefix {
635 Some(prefix) => match config.prefix_to_filesystem(prefix) {
636 Ok(path) => path,
637 Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(),
638 },
639 None => config.root.to_file_path().unwrap(),
640 };
641
642 let walkdir = WalkDir::new(root_path)
643 .min_depth(1)
645 .follow_links(true);
646
647 let maybe_offset = maybe_offset.cloned();
648
649 let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
650 if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(), result_dir_entry.as_ref()) {
653 let location = config.filesystem_to_path(entry.path());
654 match location {
655 Ok(path) if path <= *offset => return None,
656 Err(e) => return Some(Err(e)),
657 _ => {}
658 }
659 }
660
661 let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
662 Ok(entry) => entry,
663 Err(e) => return Some(Err(e)),
664 };
665
666 if !entry.path().is_file() {
667 return None;
668 }
669
670 match config.filesystem_to_path(entry.path()) {
671 Ok(path) => match is_valid_file_path(&path) {
672 true => convert_entry(entry, path).transpose(),
673 false => None,
674 },
675 Err(e) => Some(Err(e)),
676 }
677 });
678
679 if tokio::runtime::Handle::try_current().is_err() {
682 return futures::stream::iter(s).boxed();
683 }
684
685 const CHUNK_SIZE: usize = 1024;
687
688 let buffer = VecDeque::with_capacity(CHUNK_SIZE);
689 futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
690 if buffer.is_empty() {
691 (s, buffer) = tokio::task::spawn_blocking(move || {
692 for _ in 0..CHUNK_SIZE {
693 match s.next() {
694 Some(r) => buffer.push_back(r),
695 None => break,
696 }
697 }
698 (s, buffer)
699 })
700 .await?;
701 }
702
703 match buffer.pop_front() {
704 Some(Err(e)) => Err(e),
705 Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
706 None => Ok(None),
707 }
708 })
709 .boxed()
710 }
711}
712
713fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
715 let parent = path.parent().ok_or_else(|| {
716 let path = path.to_path_buf();
717 Error::UnableToCreateFile { path, source }
718 })?;
719
720 std::fs::create_dir_all(parent).map_err(|source| {
721 let path = parent.into();
722 Error::UnableToCreateDir { source, path }
723 })?;
724 Ok(())
725}
726
727fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
731 let mut multipart_id = 1;
732 loop {
733 let suffix = multipart_id.to_string();
734 let path = staged_upload_path(base, &suffix);
735 let mut options = OpenOptions::new();
736 match options.read(true).write(true).create_new(true).open(&path) {
737 Ok(f) => return Ok((f, path)),
738 Err(source) => match source.kind() {
739 ErrorKind::AlreadyExists => multipart_id += 1,
740 ErrorKind::NotFound => create_parent_dirs(&path, source)?,
741 _ => return Err(Error::UnableToOpenFile { source, path }.into()),
742 },
743 }
744 }
745}
746
747fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf {
749 let mut staging_path = dest.as_os_str().to_owned();
750 staging_path.push("#");
751 staging_path.push(suffix);
752 staging_path.into()
753}
754
755#[derive(Debug)]
756struct LocalUpload {
757 state: Arc<UploadState>,
759 src: Option<PathBuf>,
761 offset: u64,
763}
764
765#[derive(Debug)]
766struct UploadState {
767 dest: PathBuf,
768 file: Mutex<File>,
769}
770
771impl LocalUpload {
772 pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
773 Self {
774 state: Arc::new(UploadState {
775 dest,
776 file: Mutex::new(file),
777 }),
778 src: Some(src),
779 offset: 0,
780 }
781 }
782}
783
784#[async_trait]
785impl MultipartUpload for LocalUpload {
786 fn put_part(&mut self, data: PutPayload) -> UploadPart {
787 let offset = self.offset;
788 self.offset += data.content_length() as u64;
789
790 let s = Arc::clone(&self.state);
791 maybe_spawn_blocking(move || {
792 let mut file = s.file.lock();
793 file.seek(SeekFrom::Start(offset)).map_err(|source| {
794 let path = s.dest.clone();
795 Error::Seek { source, path }
796 })?;
797
798 data.iter()
799 .try_for_each(|x| file.write_all(x))
800 .map_err(|source| Error::UnableToCopyDataToFile { source })?;
801
802 Ok(())
803 })
804 .boxed()
805 }
806
807 async fn complete(&mut self) -> Result<PutResult> {
808 let src = self.src.take().ok_or(Error::Aborted)?;
809 let s = Arc::clone(&self.state);
810 maybe_spawn_blocking(move || {
811 let file = s.file.lock();
813 std::fs::rename(&src, &s.dest)
814 .map_err(|source| Error::UnableToRenameFile { source })?;
815 let metadata = file.metadata().map_err(|e| Error::Metadata {
816 source: e.into(),
817 path: src.to_string_lossy().to_string(),
818 })?;
819
820 Ok(PutResult {
821 e_tag: Some(get_etag(&metadata)),
822 version: None,
823 })
824 })
825 .await
826 }
827
828 async fn abort(&mut self) -> Result<()> {
829 let src = self.src.take().ok_or(Error::Aborted)?;
830 maybe_spawn_blocking(move || {
831 std::fs::remove_file(&src)
832 .map_err(|source| Error::UnableToDeleteFile { source, path: src })?;
833 Ok(())
834 })
835 .await
836 }
837}
838
839impl Drop for LocalUpload {
840 fn drop(&mut self) {
841 if let Some(src) = self.src.take() {
842 match tokio::runtime::Handle::try_current() {
844 Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(src))),
845 Err(_) => drop(std::fs::remove_file(src)),
846 };
847 }
848 }
849}
850
851pub(crate) fn chunked_stream(
852 mut file: File,
853 path: PathBuf,
854 range: Range<u64>,
855 chunk_size: usize,
856) -> BoxStream<'static, Result<Bytes, super::Error>> {
857 futures::stream::once(async move {
858 let (file, path) = maybe_spawn_blocking(move || {
859 file.seek(SeekFrom::Start(range.start as _))
860 .map_err(|source| Error::Seek {
861 source,
862 path: path.clone(),
863 })?;
864 Ok((file, path))
865 })
866 .await?;
867
868 let stream = futures::stream::try_unfold(
869 (file, path, range.end - range.start),
870 move |(mut file, path, remaining)| {
871 maybe_spawn_blocking(move || {
872 if remaining == 0 {
873 return Ok(None);
874 }
875
876 let to_read = remaining.min(chunk_size as u64);
877 let cap = usize::try_from(to_read).map_err(|_e| Error::InvalidRange {
878 source: InvalidGetRange::TooLarge {
879 requested: to_read,
880 max: usize::MAX as u64,
881 },
882 })?;
883 let mut buffer = Vec::with_capacity(cap);
884 let read = (&mut file)
885 .take(to_read)
886 .read_to_end(&mut buffer)
887 .map_err(|e| Error::UnableToReadBytes {
888 source: e,
889 path: path.clone(),
890 })?;
891
892 Ok(Some((buffer.into(), (file, path, remaining - read as u64))))
893 })
894 },
895 );
896 Ok::<_, super::Error>(stream)
897 })
898 .try_flatten()
899 .boxed()
900}
901
902pub(crate) fn read_range(
903 file: &mut File,
904 file_len: u64,
905 path: &PathBuf,
906 range: Range<u64>,
907) -> Result<Bytes> {
908 if range.start >= file_len {
911 return Err(Error::InvalidRange {
912 source: InvalidGetRange::StartTooLarge {
913 requested: range.start,
914 length: file_len,
915 },
916 }
917 .into());
918 }
919
920 let to_read = range.end.min(file_len) - range.start;
922
923 file.seek(SeekFrom::Start(range.start)).map_err(|source| {
924 let path = path.into();
925 Error::Seek { source, path }
926 })?;
927
928 let mut buf = Vec::with_capacity(to_read as usize);
929 let read = file.take(to_read).read_to_end(&mut buf).map_err(|source| {
930 let path = path.into();
931 Error::UnableToReadBytes { source, path }
932 })? as u64;
933
934 if read != to_read {
935 let error = Error::OutOfRange {
936 path: path.into(),
937 expected: to_read,
938 actual: read,
939 };
940
941 return Err(error.into());
942 }
943
944 Ok(buf.into())
945}
946
947fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {
948 let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
949 Err(e) => Err(match e.kind() {
950 ErrorKind::NotFound => Error::NotFound {
951 path: path.clone(),
952 source: e,
953 },
954 _ => Error::UnableToOpenFile {
955 path: path.clone(),
956 source: e,
957 },
958 }),
959 Ok((metadata, file)) => match !metadata.is_dir() {
960 true => Ok((file, metadata)),
961 false => Err(Error::NotFound {
962 path: path.clone(),
963 source: io::Error::new(ErrorKind::NotFound, "is directory"),
964 }),
965 },
966 }?;
967 Ok(ret)
968}
969
970fn convert_entry(entry: DirEntry, location: Path) -> Result<Option<ObjectMeta>> {
971 match entry.metadata() {
972 Ok(metadata) => Ok(Some(convert_metadata(metadata, location))),
973 Err(e) => {
974 if let Some(io_err) = e.io_error() {
975 if io_err.kind() == ErrorKind::NotFound {
976 return Ok(None);
977 }
978 }
979 Err(Error::Metadata {
980 source: e.into(),
981 path: location.to_string(),
982 })?
983 }
984 }
985}
986
987fn last_modified(metadata: &Metadata) -> DateTime<Utc> {
988 metadata
989 .modified()
990 .expect("Modified file time should be supported on this platform")
991 .into()
992}
993
994fn get_etag(metadata: &Metadata) -> String {
995 let inode = get_inode(metadata);
996 let size = metadata.len();
997 let mtime = metadata
998 .modified()
999 .ok()
1000 .and_then(|mtime| mtime.duration_since(SystemTime::UNIX_EPOCH).ok())
1001 .unwrap_or_default()
1002 .as_micros();
1003
1004 format!("{inode:x}-{mtime:x}-{size:x}")
1008}
1009
1010fn convert_metadata(metadata: Metadata, location: Path) -> ObjectMeta {
1011 let last_modified = last_modified(&metadata);
1012
1013 ObjectMeta {
1014 location,
1015 last_modified,
1016 size: metadata.len(),
1017 e_tag: Some(get_etag(&metadata)),
1018 version: None,
1019 }
1020}
1021
1022#[cfg(unix)]
1023fn get_inode(metadata: &Metadata) -> u64 {
1026 std::os::unix::fs::MetadataExt::ino(metadata)
1027}
1028
1029#[cfg(not(unix))]
1030fn get_inode(_metadata: &Metadata) -> u64 {
1032 0
1033}
1034
1035fn convert_walkdir_result(
1038 res: std::result::Result<DirEntry, walkdir::Error>,
1039) -> Result<Option<DirEntry>> {
1040 match res {
1041 Ok(entry) => {
1042 match symlink_metadata(entry.path()) {
1045 Ok(attr) => {
1046 if attr.is_symlink() {
1047 let target_metadata = metadata(entry.path());
1048 match target_metadata {
1049 Ok(_) => {
1050 Ok(Some(entry))
1052 }
1053 Err(_) => {
1054 Ok(None)
1056 }
1057 }
1058 } else {
1059 Ok(Some(entry))
1060 }
1061 }
1062 Err(_) => Ok(None),
1063 }
1064 }
1065
1066 Err(walkdir_err) => match walkdir_err.io_error() {
1067 Some(io_err) => match io_err.kind() {
1068 ErrorKind::NotFound => Ok(None),
1069 _ => Err(Error::UnableToWalkDir {
1070 source: walkdir_err,
1071 }
1072 .into()),
1073 },
1074 None => Err(Error::UnableToWalkDir {
1075 source: walkdir_err,
1076 }
1077 .into()),
1078 },
1079 }
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084 use std::fs;
1085
1086 use futures::TryStreamExt;
1087 use tempfile::TempDir;
1088
1089 #[cfg(target_family = "unix")]
1090 use tempfile::NamedTempFile;
1091
1092 use crate::integration::*;
1093
1094 use super::*;
1095
1096 #[tokio::test]
1097 #[cfg(target_family = "unix")]
1098 async fn file_test() {
1099 let root = TempDir::new().unwrap();
1100 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1101
1102 put_get_delete_list(&integration).await;
1103 get_opts(&integration).await;
1104 list_uses_directories_correctly(&integration).await;
1105 list_with_delimiter(&integration).await;
1106 rename_and_copy(&integration).await;
1107 copy_if_not_exists(&integration).await;
1108 copy_rename_nonexistent_object(&integration).await;
1109 stream_get(&integration).await;
1110 put_opts(&integration, false).await;
1111 }
1112
1113 #[test]
1114 #[cfg(target_family = "unix")]
1115 fn test_non_tokio() {
1116 let root = TempDir::new().unwrap();
1117 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1118 futures::executor::block_on(async move {
1119 put_get_delete_list(&integration).await;
1120 list_uses_directories_correctly(&integration).await;
1121 list_with_delimiter(&integration).await;
1122
1123 let p = Path::from("manual_upload");
1125 let mut upload = integration.put_multipart(&p).await.unwrap();
1126 upload.put_part("123".into()).await.unwrap();
1127 upload.put_part("45678".into()).await.unwrap();
1128 let r = upload.complete().await.unwrap();
1129
1130 let get = integration.get(&p).await.unwrap();
1131 assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap());
1132 let actual = get.bytes().await.unwrap();
1133 assert_eq!(actual.as_ref(), b"12345678");
1134 });
1135 }
1136
1137 #[tokio::test]
1138 async fn creates_dir_if_not_present() {
1139 let root = TempDir::new().unwrap();
1140 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1141
1142 let location = Path::from("nested/file/test_file");
1143
1144 let data = Bytes::from("arbitrary data");
1145
1146 integration
1147 .put(&location, data.clone().into())
1148 .await
1149 .unwrap();
1150
1151 let read_data = integration
1152 .get(&location)
1153 .await
1154 .unwrap()
1155 .bytes()
1156 .await
1157 .unwrap();
1158 assert_eq!(&*read_data, data);
1159 }
1160
1161 #[tokio::test]
1162 async fn unknown_length() {
1163 let root = TempDir::new().unwrap();
1164 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1165
1166 let location = Path::from("some_file");
1167
1168 let data = Bytes::from("arbitrary data");
1169
1170 integration
1171 .put(&location, data.clone().into())
1172 .await
1173 .unwrap();
1174
1175 let read_data = integration
1176 .get(&location)
1177 .await
1178 .unwrap()
1179 .bytes()
1180 .await
1181 .unwrap();
1182 assert_eq!(&*read_data, data);
1183 }
1184
1185 #[tokio::test]
1186 async fn range_request_start_beyond_end_of_file() {
1187 let root = TempDir::new().unwrap();
1188 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1189
1190 let location = Path::from("some_file");
1191
1192 let data = Bytes::from("arbitrary data");
1193
1194 integration
1195 .put(&location, data.clone().into())
1196 .await
1197 .unwrap();
1198
1199 integration
1200 .get_range(&location, 100..200)
1201 .await
1202 .expect_err("Should error with start range beyond end of file");
1203 }
1204
1205 #[tokio::test]
1206 async fn range_request_beyond_end_of_file() {
1207 let root = TempDir::new().unwrap();
1208 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1209
1210 let location = Path::from("some_file");
1211
1212 let data = Bytes::from("arbitrary data");
1213
1214 integration
1215 .put(&location, data.clone().into())
1216 .await
1217 .unwrap();
1218
1219 let read_data = integration.get_range(&location, 0..100).await.unwrap();
1220 assert_eq!(&*read_data, data);
1221 }
1222
1223 #[tokio::test]
1224 #[cfg(target_family = "unix")]
1225 #[ignore]
1227 async fn bubble_up_io_errors() {
1228 use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
1229
1230 let root = TempDir::new().unwrap();
1231
1232 let metadata = root.path().metadata().unwrap();
1234 let mut permissions = metadata.permissions();
1235 permissions.set_mode(0o000);
1236 set_permissions(root.path(), permissions).unwrap();
1237
1238 let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1239
1240 let mut stream = store.list(None);
1241 let mut any_err = false;
1242 while let Some(res) = stream.next().await {
1243 if res.is_err() {
1244 any_err = true;
1245 }
1246 }
1247 assert!(any_err);
1248
1249 assert!(store.list_with_delimiter(None).await.is_err());
1251 }
1252
1253 const NON_EXISTENT_NAME: &str = "nonexistentname";
1254
1255 #[tokio::test]
1256 async fn get_nonexistent_location() {
1257 let root = TempDir::new().unwrap();
1258 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1259
1260 let location = Path::from(NON_EXISTENT_NAME);
1261
1262 let err = get_nonexistent_object(&integration, Some(location))
1263 .await
1264 .unwrap_err();
1265 if let crate::Error::NotFound { path, source } = err {
1266 let source_variant = source.downcast_ref::<std::io::Error>();
1267 assert!(
1268 matches!(source_variant, Some(std::io::Error { .. }),),
1269 "got: {source_variant:?}"
1270 );
1271 assert!(path.ends_with(NON_EXISTENT_NAME), "{}", path);
1272 } else {
1273 panic!("unexpected error type: {err:?}");
1274 }
1275 }
1276
1277 #[tokio::test]
1278 async fn root() {
1279 let integration = LocalFileSystem::new();
1280
1281 let canonical = std::path::Path::new("Cargo.toml").canonicalize().unwrap();
1282 let url = Url::from_directory_path(&canonical).unwrap();
1283 let path = Path::parse(url.path()).unwrap();
1284
1285 let roundtrip = integration.path_to_filesystem(&path).unwrap();
1286
1287 let roundtrip = roundtrip.canonicalize().unwrap();
1290
1291 assert_eq!(roundtrip, canonical);
1292
1293 integration.head(&path).await.unwrap();
1294 }
1295
1296 #[tokio::test]
1297 #[cfg(target_family = "windows")]
1298 async fn test_list_root() {
1299 let fs = LocalFileSystem::new();
1300 let r = fs.list_with_delimiter(None).await.unwrap_err().to_string();
1301
1302 assert!(
1303 r.contains("Unable to convert URL \"file:///\" to filesystem path"),
1304 "{}",
1305 r
1306 );
1307 }
1308
1309 #[tokio::test]
1310 #[cfg(target_os = "linux")]
1311 async fn test_list_root() {
1312 let fs = LocalFileSystem::new();
1313 fs.list_with_delimiter(None).await.unwrap();
1314 }
1315
1316 #[cfg(target_family = "unix")]
1317 async fn check_list(integration: &LocalFileSystem, prefix: Option<&Path>, expected: &[&str]) {
1318 let result: Vec<_> = integration.list(prefix).try_collect().await.unwrap();
1319
1320 let mut strings: Vec<_> = result.iter().map(|x| x.location.as_ref()).collect();
1321 strings.sort_unstable();
1322 assert_eq!(&strings, expected)
1323 }
1324
1325 #[tokio::test]
1326 #[cfg(target_family = "unix")]
1327 async fn test_symlink() {
1328 let root = TempDir::new().unwrap();
1329 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1330
1331 let subdir = root.path().join("a");
1332 std::fs::create_dir(&subdir).unwrap();
1333 let file = subdir.join("file.parquet");
1334 std::fs::write(file, "test").unwrap();
1335
1336 check_list(&integration, None, &["a/file.parquet"]).await;
1337 integration
1338 .head(&Path::from("a/file.parquet"))
1339 .await
1340 .unwrap();
1341
1342 let other = NamedTempFile::new().unwrap();
1344 std::os::unix::fs::symlink(other.path(), root.path().join("test.parquet")).unwrap();
1345
1346 check_list(&integration, None, &["a/file.parquet", "test.parquet"]).await;
1348
1349 integration.head(&Path::from("test.parquet")).await.unwrap();
1351
1352 std::os::unix::fs::symlink(&subdir, root.path().join("b")).unwrap();
1354 check_list(
1355 &integration,
1356 None,
1357 &["a/file.parquet", "b/file.parquet", "test.parquet"],
1358 )
1359 .await;
1360 check_list(&integration, Some(&Path::from("b")), &["b/file.parquet"]).await;
1361
1362 integration
1364 .head(&Path::from("b/file.parquet"))
1365 .await
1366 .unwrap();
1367
1368 std::os::unix::fs::symlink(root.path().join("foo.parquet"), root.path().join("c")).unwrap();
1370
1371 check_list(
1372 &integration,
1373 None,
1374 &["a/file.parquet", "b/file.parquet", "test.parquet"],
1375 )
1376 .await;
1377
1378 let mut r = integration.list_with_delimiter(None).await.unwrap();
1379 r.common_prefixes.sort_unstable();
1380 assert_eq!(r.common_prefixes.len(), 2);
1381 assert_eq!(r.common_prefixes[0].as_ref(), "a");
1382 assert_eq!(r.common_prefixes[1].as_ref(), "b");
1383 assert_eq!(r.objects.len(), 1);
1384 assert_eq!(r.objects[0].location.as_ref(), "test.parquet");
1385
1386 let r = integration
1387 .list_with_delimiter(Some(&Path::from("a")))
1388 .await
1389 .unwrap();
1390 assert_eq!(r.common_prefixes.len(), 0);
1391 assert_eq!(r.objects.len(), 1);
1392 assert_eq!(r.objects[0].location.as_ref(), "a/file.parquet");
1393
1394 integration
1396 .delete(&Path::from("test.parquet"))
1397 .await
1398 .unwrap();
1399 assert!(other.path().exists());
1400
1401 check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1402
1403 integration
1405 .delete(&Path::from("b/file.parquet"))
1406 .await
1407 .unwrap();
1408
1409 check_list(&integration, None, &[]).await;
1410
1411 integration
1413 .put(&Path::from("b/file.parquet"), vec![0, 1, 2].into())
1414 .await
1415 .unwrap();
1416
1417 check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1418 }
1419
1420 #[tokio::test]
1421 async fn invalid_path() {
1422 let root = TempDir::new().unwrap();
1423 let root = root.path().join("🙀");
1424 std::fs::create_dir(root.clone()).unwrap();
1425
1426 let integration = LocalFileSystem::new_with_prefix(root.clone()).unwrap();
1428
1429 let directory = Path::from("directory");
1430 let object = directory.child("child.txt");
1431 let data = Bytes::from("arbitrary");
1432 integration.put(&object, data.clone().into()).await.unwrap();
1433 integration.head(&object).await.unwrap();
1434 let result = integration.get(&object).await.unwrap();
1435 assert_eq!(result.bytes().await.unwrap(), data);
1436
1437 flatten_list_stream(&integration, None).await.unwrap();
1438 flatten_list_stream(&integration, Some(&directory))
1439 .await
1440 .unwrap();
1441
1442 let result = integration
1443 .list_with_delimiter(Some(&directory))
1444 .await
1445 .unwrap();
1446 assert_eq!(result.objects.len(), 1);
1447 assert!(result.common_prefixes.is_empty());
1448 assert_eq!(result.objects[0].location, object);
1449
1450 let emoji = root.join("💀");
1451 std::fs::write(emoji, "foo").unwrap();
1452
1453 let mut paths = flatten_list_stream(&integration, None).await.unwrap();
1455 paths.sort_unstable();
1456
1457 assert_eq!(
1458 paths,
1459 vec![
1460 Path::parse("directory/child.txt").unwrap(),
1461 Path::parse("💀").unwrap()
1462 ]
1463 );
1464 }
1465
1466 #[tokio::test]
1467 async fn list_hides_incomplete_uploads() {
1468 let root = TempDir::new().unwrap();
1469 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1470 let location = Path::from("some_file");
1471
1472 let data = PutPayload::from("arbitrary data");
1473 let mut u1 = integration.put_multipart(&location).await.unwrap();
1474 u1.put_part(data.clone()).await.unwrap();
1475
1476 let mut u2 = integration.put_multipart(&location).await.unwrap();
1477 u2.put_part(data).await.unwrap();
1478
1479 let list = flatten_list_stream(&integration, None).await.unwrap();
1480 assert_eq!(list.len(), 0);
1481
1482 assert_eq!(
1483 integration
1484 .list_with_delimiter(None)
1485 .await
1486 .unwrap()
1487 .objects
1488 .len(),
1489 0
1490 );
1491 }
1492
1493 #[tokio::test]
1494 async fn test_path_with_offset() {
1495 let root = TempDir::new().unwrap();
1496 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1497
1498 let root_path = root.path();
1499 for i in 0..5 {
1500 let filename = format!("test{i}.parquet");
1501 let file = root_path.join(filename);
1502 std::fs::write(file, "test").unwrap();
1503 }
1504 let filter_str = "test";
1505 let filter = String::from(filter_str);
1506 let offset_str = filter + "1";
1507 let offset = Path::from(offset_str.clone());
1508
1509 let res = integration.list_with_offset(None, &offset);
1511 let offset_paths: Vec<_> = res.map_ok(|x| x.location).try_collect().await.unwrap();
1512 let mut offset_files: Vec<_> = offset_paths
1513 .iter()
1514 .map(|x| String::from(x.filename().unwrap()))
1515 .collect();
1516
1517 let files = fs::read_dir(root_path).unwrap();
1519 let filtered_files = files
1520 .filter_map(Result::ok)
1521 .filter_map(|d| {
1522 d.file_name().to_str().and_then(|f| {
1523 if f.contains(filter_str) {
1524 Some(String::from(f))
1525 } else {
1526 None
1527 }
1528 })
1529 })
1530 .collect::<Vec<_>>();
1531
1532 let mut expected_offset_files: Vec<_> = filtered_files
1533 .iter()
1534 .filter(|s| **s > offset_str)
1535 .cloned()
1536 .collect();
1537
1538 fn do_vecs_match<T: PartialEq>(a: &[T], b: &[T]) -> bool {
1539 let matching = a.iter().zip(b.iter()).filter(|&(a, b)| a == b).count();
1540 matching == a.len() && matching == b.len()
1541 }
1542
1543 offset_files.sort();
1544 expected_offset_files.sort();
1545
1546 assert_eq!(offset_files.len(), expected_offset_files.len());
1550 assert!(do_vecs_match(&expected_offset_files, &offset_files));
1551 }
1552
1553 #[tokio::test]
1554 async fn filesystem_filename_with_percent() {
1555 let temp_dir = TempDir::new().unwrap();
1556 let integration = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1557 let filename = "L%3ABC.parquet";
1558
1559 std::fs::write(temp_dir.path().join(filename), "foo").unwrap();
1560
1561 let res: Vec<_> = integration.list(None).try_collect().await.unwrap();
1562 assert_eq!(res.len(), 1);
1563 assert_eq!(res[0].location.as_ref(), filename);
1564
1565 let res = integration.list_with_delimiter(None).await.unwrap();
1566 assert_eq!(res.objects.len(), 1);
1567 assert_eq!(res.objects[0].location.as_ref(), filename);
1568 }
1569
1570 #[tokio::test]
1571 async fn relative_paths() {
1572 LocalFileSystem::new_with_prefix(".").unwrap();
1573 LocalFileSystem::new_with_prefix("..").unwrap();
1574 LocalFileSystem::new_with_prefix("../..").unwrap();
1575
1576 let integration = LocalFileSystem::new();
1577 let path = Path::from_filesystem_path(".").unwrap();
1578 integration.list_with_delimiter(Some(&path)).await.unwrap();
1579 }
1580
1581 #[test]
1582 fn test_valid_path() {
1583 let cases = [
1584 ("foo#123/test.txt", true),
1585 ("foo#123/test#23.txt", true),
1586 ("foo#123/test#34", false),
1587 ("foo😁/test#34", false),
1588 ("foo/test#😁34", true),
1589 ];
1590
1591 for (case, expected) in cases {
1592 let path = Path::parse(case).unwrap();
1593 assert_eq!(is_valid_file_path(&path), expected);
1594 }
1595 }
1596
1597 #[tokio::test]
1598 async fn test_intermediate_files() {
1599 let root = TempDir::new().unwrap();
1600 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1601
1602 let a = Path::parse("foo#123/test.txt").unwrap();
1603 integration.put(&a, "test".into()).await.unwrap();
1604
1605 let list = flatten_list_stream(&integration, None).await.unwrap();
1606 assert_eq!(list, vec![a.clone()]);
1607
1608 std::fs::write(root.path().join("bar#123"), "test").unwrap();
1609
1610 let list = flatten_list_stream(&integration, None).await.unwrap();
1612 assert_eq!(list, vec![a.clone()]);
1613
1614 let b = Path::parse("bar#123").unwrap();
1615 let err = integration.get(&b).await.unwrap_err().to_string();
1616 assert_eq!(err, "Generic LocalFileSystem error: Filenames containing trailing '/#\\d+/' are not supported: bar#123");
1617
1618 let c = Path::parse("foo#123.txt").unwrap();
1619 integration.put(&c, "test".into()).await.unwrap();
1620
1621 let mut list = flatten_list_stream(&integration, None).await.unwrap();
1622 list.sort_unstable();
1623 assert_eq!(list, vec![c, a]);
1624 }
1625
1626 #[tokio::test]
1627 #[cfg(target_os = "windows")]
1628 async fn filesystem_filename_with_colon() {
1629 let root = TempDir::new().unwrap();
1630 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1631 let path = Path::parse("file%3Aname.parquet").unwrap();
1632 let location = Path::parse("file:name.parquet").unwrap();
1633
1634 integration.put(&location, "test".into()).await.unwrap();
1635 let list = flatten_list_stream(&integration, None).await.unwrap();
1636 assert_eq!(list, vec![path.clone()]);
1637
1638 let result = integration
1639 .get(&location)
1640 .await
1641 .unwrap()
1642 .bytes()
1643 .await
1644 .unwrap();
1645 assert_eq!(result, Bytes::from("test"));
1646 }
1647
1648 #[tokio::test]
1649 async fn delete_dirs_automatically() {
1650 let root = TempDir::new().unwrap();
1651 let integration = LocalFileSystem::new_with_prefix(root.path())
1652 .unwrap()
1653 .with_automatic_cleanup(true);
1654 let location = Path::from("nested/file/test_file");
1655 let data = Bytes::from("arbitrary data");
1656
1657 integration
1658 .put(&location, data.clone().into())
1659 .await
1660 .unwrap();
1661
1662 let read_data = integration
1663 .get(&location)
1664 .await
1665 .unwrap()
1666 .bytes()
1667 .await
1668 .unwrap();
1669
1670 assert_eq!(&*read_data, data);
1671 assert!(fs::read_dir(root.path()).unwrap().count() > 0);
1672 integration.delete(&location).await.unwrap();
1673 assert!(fs::read_dir(root.path()).unwrap().count() == 0);
1674 }
1675}
1676
1677#[cfg(not(target_arch = "wasm32"))]
1678#[cfg(test)]
1679mod not_wasm_tests {
1680 use std::time::Duration;
1681 use tempfile::TempDir;
1682
1683 use crate::local::LocalFileSystem;
1684 use crate::{ObjectStore, Path, PutPayload};
1685
1686 #[tokio::test]
1687 async fn test_cleanup_intermediate_files() {
1688 let root = TempDir::new().unwrap();
1689 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1690
1691 let location = Path::from("some_file");
1692 let data = PutPayload::from_static(b"hello");
1693 let mut upload = integration.put_multipart(&location).await.unwrap();
1694 upload.put_part(data).await.unwrap();
1695
1696 let file_count = std::fs::read_dir(root.path()).unwrap().count();
1697 assert_eq!(file_count, 1);
1698 drop(upload);
1699
1700 for _ in 0..100 {
1701 tokio::time::sleep(Duration::from_millis(1)).await;
1702 let file_count = std::fs::read_dir(root.path()).unwrap().count();
1703 if file_count == 0 {
1704 return;
1705 }
1706 }
1707 panic!("Failed to cleanup file in 100ms")
1708 }
1709}
1710
1711#[cfg(target_family = "unix")]
1712#[cfg(test)]
1713mod unix_test {
1714 use std::fs::OpenOptions;
1715
1716 use nix::sys::stat;
1717 use nix::unistd;
1718 use tempfile::TempDir;
1719
1720 use crate::local::LocalFileSystem;
1721 use crate::{ObjectStore, Path};
1722
1723 #[tokio::test]
1724 async fn test_fifo() {
1725 let filename = "some_file";
1726 let root = TempDir::new().unwrap();
1727 let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1728 let path = root.path().join(filename);
1729 unistd::mkfifo(&path, stat::Mode::S_IRWXU).unwrap();
1730
1731 let spawned =
1733 tokio::task::spawn_blocking(|| OpenOptions::new().write(true).open(path).unwrap());
1734
1735 let location = Path::from(filename);
1736 integration.head(&location).await.unwrap();
1737 integration.get(&location).await.unwrap();
1738
1739 spawned.await.unwrap();
1740 }
1741}