object_store/
local.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! An object store implementation for a local filesystem
19use std::fs::{metadata, symlink_metadata, File, Metadata, OpenOptions};
20use std::io::{ErrorKind, Read, Seek, SeekFrom, Write};
21use std::ops::Range;
22use std::sync::Arc;
23use std::time::SystemTime;
24use std::{collections::BTreeSet, io};
25use std::{collections::VecDeque, path::PathBuf};
26
27use async_trait::async_trait;
28use bytes::Bytes;
29use chrono::{DateTime, Utc};
30use futures::{stream::BoxStream, StreamExt};
31use futures::{FutureExt, TryStreamExt};
32use parking_lot::Mutex;
33use url::Url;
34use walkdir::{DirEntry, WalkDir};
35
36use crate::{
37    maybe_spawn_blocking,
38    path::{absolute_path_to_url, Path},
39    util::InvalidGetRange,
40    Attributes, GetOptions, GetResult, GetResultPayload, ListResult, MultipartUpload, ObjectMeta,
41    ObjectStore, PutMode, PutMultipartOptions, PutOptions, PutPayload, PutResult, Result,
42    UploadPart,
43};
44
45/// A specialized `Error` for filesystem object store-related errors
46#[derive(Debug, thiserror::Error)]
47pub(crate) enum Error {
48    #[error("Unable to walk dir: {}", source)]
49    UnableToWalkDir { source: walkdir::Error },
50
51    #[error("Unable to access metadata for {}: {}", path, source)]
52    Metadata {
53        source: Box<dyn std::error::Error + Send + Sync + 'static>,
54        path: String,
55    },
56
57    #[error("Unable to copy data to file: {}", source)]
58    UnableToCopyDataToFile { source: io::Error },
59
60    #[error("Unable to rename file: {}", source)]
61    UnableToRenameFile { source: io::Error },
62
63    #[error("Unable to create dir {}: {}", path.display(), source)]
64    UnableToCreateDir { source: io::Error, path: PathBuf },
65
66    #[error("Unable to create file {}: {}", path.display(), source)]
67    UnableToCreateFile { source: io::Error, path: PathBuf },
68
69    #[error("Unable to delete file {}: {}", path.display(), source)]
70    UnableToDeleteFile { source: io::Error, path: PathBuf },
71
72    #[error("Unable to open file {}: {}", path.display(), source)]
73    UnableToOpenFile { source: io::Error, path: PathBuf },
74
75    #[error("Unable to read data from file {}: {}", path.display(), source)]
76    UnableToReadBytes { source: io::Error, path: PathBuf },
77
78    #[error("Out of range of file {}, expected: {}, actual: {}", path.display(), expected, actual)]
79    OutOfRange {
80        path: PathBuf,
81        expected: u64,
82        actual: u64,
83    },
84
85    #[error("Requested range was invalid")]
86    InvalidRange { source: InvalidGetRange },
87
88    #[error("Unable to copy file from {} to {}: {}", from.display(), to.display(), source)]
89    UnableToCopyFile {
90        from: PathBuf,
91        to: PathBuf,
92        source: io::Error,
93    },
94
95    #[error("NotFound")]
96    NotFound { path: PathBuf, source: io::Error },
97
98    #[error("Error seeking file {}: {}", path.display(), source)]
99    Seek { source: io::Error, path: PathBuf },
100
101    #[error("Unable to convert URL \"{}\" to filesystem path", url)]
102    InvalidUrl { url: Url },
103
104    #[error("AlreadyExists")]
105    AlreadyExists { path: String, source: io::Error },
106
107    #[error("Unable to canonicalize filesystem root: {}", path.display())]
108    UnableToCanonicalize { path: PathBuf, source: io::Error },
109
110    #[error("Filenames containing trailing '/#\\d+/' are not supported: {}", path)]
111    InvalidPath { path: String },
112
113    #[error("Upload aborted")]
114    Aborted,
115}
116
117impl From<Error> for super::Error {
118    fn from(source: Error) -> Self {
119        match source {
120            Error::NotFound { path, source } => Self::NotFound {
121                path: path.to_string_lossy().to_string(),
122                source: source.into(),
123            },
124            Error::AlreadyExists { path, source } => Self::AlreadyExists {
125                path,
126                source: source.into(),
127            },
128            _ => Self::Generic {
129                store: "LocalFileSystem",
130                source: Box::new(source),
131            },
132        }
133    }
134}
135
136/// Local filesystem storage providing an [`ObjectStore`] interface to files on
137/// local disk. Can optionally be created with a directory prefix
138///
139/// # Path Semantics
140///
141/// This implementation follows the [file URI] scheme outlined in [RFC 3986]. In
142/// particular paths are delimited by `/`
143///
144/// [file URI]: https://en.wikipedia.org/wiki/File_URI_scheme
145/// [RFC 3986]: https://www.rfc-editor.org/rfc/rfc3986
146///
147/// # Path Semantics
148///
149/// [`LocalFileSystem`] will expose the path semantics of the underlying filesystem, which may
150/// have additional restrictions beyond those enforced by [`Path`].
151///
152/// For example:
153///
154/// * Windows forbids certain filenames, e.g. `COM0`,
155/// * Windows forbids folders with trailing `.`
156/// * Windows forbids certain ASCII characters, e.g. `<` or `|`
157/// * OS X forbids filenames containing `:`
158/// * Leading `-` are discouraged on Unix systems where they may be interpreted as CLI flags
159/// * Filesystems may have restrictions on the maximum path or path segment length
160/// * Filesystem support for non-ASCII characters is inconsistent
161///
162/// Additionally some filesystems, such as NTFS, are case-insensitive, whilst others like
163/// FAT don't preserve case at all. Further some filesystems support non-unicode character
164/// sequences, such as unpaired UTF-16 surrogates, and [`LocalFileSystem`] will error on
165/// encountering such sequences.
166///
167/// Finally, filenames matching the regex `/.*#\d+/`, e.g. `foo.parquet#123`, are not supported
168/// by [`LocalFileSystem`] as they are used to provide atomic writes. Such files will be ignored
169/// for listing operations, and attempting to address such a file will error.
170///
171/// # Tokio Compatibility
172///
173/// Tokio discourages performing blocking IO on a tokio worker thread, however,
174/// no major operating systems have stable async file APIs. Therefore if called from
175/// a tokio context, this will use [`tokio::runtime::Handle::spawn_blocking`] to dispatch
176/// IO to a blocking thread pool, much like `tokio::fs` does under-the-hood.
177///
178/// If not called from a tokio context, this will perform IO on the current thread with
179/// no additional complexity or overheads
180///
181/// # Symlinks
182///
183/// [`LocalFileSystem`] will follow symlinks as normal, however, it is worth noting:
184///
185/// * Broken symlinks will be silently ignored by listing operations
186/// * No effort is made to prevent breaking symlinks when deleting files
187/// * Symlinks that resolve to paths outside the root **will** be followed
188/// * Mutating a file through one or more symlinks will mutate the underlying file
189/// * Deleting a path that resolves to a symlink will only delete the symlink
190///
191/// # Cross-Filesystem Copy
192///
193/// [`LocalFileSystem::copy`] is implemented using [`std::fs::hard_link`], and therefore
194/// does not support copying across filesystem boundaries.
195///
196#[derive(Debug)]
197pub struct LocalFileSystem {
198    config: Arc<Config>,
199    // if you want to delete empty directories when deleting files
200    automatic_cleanup: bool,
201}
202
203#[derive(Debug)]
204struct Config {
205    root: Url,
206}
207
208impl std::fmt::Display for LocalFileSystem {
209    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
210        write!(f, "LocalFileSystem({})", self.config.root)
211    }
212}
213
214impl Default for LocalFileSystem {
215    fn default() -> Self {
216        Self::new()
217    }
218}
219
220impl LocalFileSystem {
221    /// Create new filesystem storage with no prefix
222    pub fn new() -> Self {
223        Self {
224            config: Arc::new(Config {
225                root: Url::parse("file:///").unwrap(),
226            }),
227            automatic_cleanup: false,
228        }
229    }
230
231    /// Create new filesystem storage with `prefix` applied to all paths
232    ///
233    /// Returns an error if the path does not exist
234    ///
235    pub fn new_with_prefix(prefix: impl AsRef<std::path::Path>) -> Result<Self> {
236        let path = std::fs::canonicalize(&prefix).map_err(|source| {
237            let path = prefix.as_ref().into();
238            Error::UnableToCanonicalize { source, path }
239        })?;
240
241        Ok(Self {
242            config: Arc::new(Config {
243                root: absolute_path_to_url(path)?,
244            }),
245            automatic_cleanup: false,
246        })
247    }
248
249    /// Return an absolute filesystem path of the given file location
250    pub fn path_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
251        if !is_valid_file_path(location) {
252            let path = location.as_ref().into();
253            let error = Error::InvalidPath { path };
254            return Err(error.into());
255        }
256
257        let path = self.config.prefix_to_filesystem(location)?;
258
259        #[cfg(target_os = "windows")]
260        let path = {
261            let path = path.to_string_lossy();
262
263            // Assume the first char is the drive letter and the next is a colon.
264            let mut out = String::new();
265            let drive = &path[..2]; // The drive letter and colon (e.g., "C:")
266            let filepath = &path[2..].replace(':', "%3A"); // Replace subsequent colons
267            out.push_str(drive);
268            out.push_str(filepath);
269            PathBuf::from(out)
270        };
271
272        Ok(path)
273    }
274
275    /// Enable automatic cleanup of empty directories when deleting files
276    pub fn with_automatic_cleanup(mut self, automatic_cleanup: bool) -> Self {
277        self.automatic_cleanup = automatic_cleanup;
278        self
279    }
280}
281
282impl Config {
283    /// Return an absolute filesystem path of the given location
284    fn prefix_to_filesystem(&self, location: &Path) -> Result<PathBuf> {
285        let mut url = self.root.clone();
286        url.path_segments_mut()
287            .expect("url path")
288            // technically not necessary as Path ignores empty segments
289            // but avoids creating paths with "//" which look odd in error messages.
290            .pop_if_empty()
291            .extend(location.parts());
292
293        url.to_file_path()
294            .map_err(|_| Error::InvalidUrl { url }.into())
295    }
296
297    /// Resolves the provided absolute filesystem path to a [`Path`] prefix
298    fn filesystem_to_path(&self, location: &std::path::Path) -> Result<Path> {
299        Ok(Path::from_absolute_path_with_base(
300            location,
301            Some(&self.root),
302        )?)
303    }
304}
305
306fn is_valid_file_path(path: &Path) -> bool {
307    match path.filename() {
308        Some(p) => match p.split_once('#') {
309            Some((_, suffix)) if !suffix.is_empty() => {
310                // Valid if contains non-digits
311                !suffix.as_bytes().iter().all(|x| x.is_ascii_digit())
312            }
313            _ => true,
314        },
315        None => false,
316    }
317}
318
319#[async_trait]
320impl ObjectStore for LocalFileSystem {
321    async fn put_opts(
322        &self,
323        location: &Path,
324        payload: PutPayload,
325        opts: PutOptions,
326    ) -> Result<PutResult> {
327        if matches!(opts.mode, PutMode::Update(_)) {
328            return Err(crate::Error::NotImplemented);
329        }
330
331        if !opts.attributes.is_empty() {
332            return Err(crate::Error::NotImplemented);
333        }
334
335        let path = self.path_to_filesystem(location)?;
336        maybe_spawn_blocking(move || {
337            let (mut file, staging_path) = new_staged_upload(&path)?;
338            let mut e_tag = None;
339
340            let err = match payload.iter().try_for_each(|x| file.write_all(x)) {
341                Ok(_) => {
342                    let metadata = file.metadata().map_err(|e| Error::Metadata {
343                        source: e.into(),
344                        path: path.to_string_lossy().to_string(),
345                    })?;
346                    e_tag = Some(get_etag(&metadata));
347                    match opts.mode {
348                        PutMode::Overwrite => {
349                            // For some fuse types of file systems, the file must be closed first
350                            // to trigger the upload operation, and then renamed, such as Blobfuse
351                            std::mem::drop(file);
352                            match std::fs::rename(&staging_path, &path) {
353                                Ok(_) => None,
354                                Err(source) => Some(Error::UnableToRenameFile { source }),
355                            }
356                        }
357                        PutMode::Create => match std::fs::hard_link(&staging_path, &path) {
358                            Ok(_) => {
359                                let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
360                                None
361                            }
362                            Err(source) => match source.kind() {
363                                ErrorKind::AlreadyExists => Some(Error::AlreadyExists {
364                                    path: path.to_str().unwrap().to_string(),
365                                    source,
366                                }),
367                                _ => Some(Error::UnableToRenameFile { source }),
368                            },
369                        },
370                        PutMode::Update(_) => unreachable!(),
371                    }
372                }
373                Err(source) => Some(Error::UnableToCopyDataToFile { source }),
374            };
375
376            if let Some(err) = err {
377                let _ = std::fs::remove_file(&staging_path); // Attempt to cleanup
378                return Err(err.into());
379            }
380
381            Ok(PutResult {
382                e_tag,
383                version: None,
384            })
385        })
386        .await
387    }
388
389    async fn put_multipart_opts(
390        &self,
391        location: &Path,
392        opts: PutMultipartOptions,
393    ) -> Result<Box<dyn MultipartUpload>> {
394        if !opts.attributes.is_empty() {
395            return Err(crate::Error::NotImplemented);
396        }
397
398        let dest = self.path_to_filesystem(location)?;
399        let (file, src) = new_staged_upload(&dest)?;
400        Ok(Box::new(LocalUpload::new(src, dest, file)))
401    }
402
403    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
404        let location = location.clone();
405        let path = self.path_to_filesystem(&location)?;
406        maybe_spawn_blocking(move || {
407            let (file, metadata) = open_file(&path)?;
408            let meta = convert_metadata(metadata, location);
409            options.check_preconditions(&meta)?;
410
411            let range = match options.range {
412                Some(r) => r
413                    .as_range(meta.size)
414                    .map_err(|source| Error::InvalidRange { source })?,
415                None => 0..meta.size,
416            };
417
418            Ok(GetResult {
419                payload: GetResultPayload::File(file, path),
420                attributes: Attributes::default(),
421                range,
422                meta,
423            })
424        })
425        .await
426    }
427
428    async fn get_range(&self, location: &Path, range: Range<u64>) -> Result<Bytes> {
429        let path = self.path_to_filesystem(location)?;
430        maybe_spawn_blocking(move || {
431            let (mut file, metadata) = open_file(&path)?;
432            read_range(&mut file, metadata.len(), &path, range)
433        })
434        .await
435    }
436
437    async fn get_ranges(&self, location: &Path, ranges: &[Range<u64>]) -> Result<Vec<Bytes>> {
438        let path = self.path_to_filesystem(location)?;
439        let ranges = ranges.to_vec();
440        maybe_spawn_blocking(move || {
441            // Vectored IO might be faster
442            let (mut file, metadata) = open_file(&path)?;
443            ranges
444                .into_iter()
445                .map(|r| read_range(&mut file, metadata.len(), &path, r))
446                .collect()
447        })
448        .await
449    }
450
451    async fn delete(&self, location: &Path) -> Result<()> {
452        let config = Arc::clone(&self.config);
453        let path = self.path_to_filesystem(location)?;
454        let automactic_cleanup = self.automatic_cleanup;
455        maybe_spawn_blocking(move || {
456            if let Err(e) = std::fs::remove_file(&path) {
457                Err(match e.kind() {
458                    ErrorKind::NotFound => Error::NotFound { path, source: e }.into(),
459                    _ => Error::UnableToDeleteFile { path, source: e }.into(),
460                })
461            } else if automactic_cleanup {
462                let root = &config.root;
463                let root = root
464                    .to_file_path()
465                    .map_err(|_| Error::InvalidUrl { url: root.clone() })?;
466
467                // here we will try to traverse up and delete an empty dir if possible until we reach the root or get an error
468                let mut parent = path.parent();
469
470                while let Some(loc) = parent {
471                    if loc != root && std::fs::remove_dir(loc).is_ok() {
472                        parent = loc.parent();
473                    } else {
474                        break;
475                    }
476                }
477
478                Ok(())
479            } else {
480                Ok(())
481            }
482        })
483        .await
484    }
485
486    fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
487        self.list_with_maybe_offset(prefix, None)
488    }
489
490    fn list_with_offset(
491        &self,
492        prefix: Option<&Path>,
493        offset: &Path,
494    ) -> BoxStream<'static, Result<ObjectMeta>> {
495        self.list_with_maybe_offset(prefix, Some(offset))
496    }
497
498    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
499        let config = Arc::clone(&self.config);
500
501        let prefix = prefix.cloned().unwrap_or_default();
502        let resolved_prefix = config.prefix_to_filesystem(&prefix)?;
503
504        maybe_spawn_blocking(move || {
505            let walkdir = WalkDir::new(&resolved_prefix)
506                .min_depth(1)
507                .max_depth(1)
508                .follow_links(true);
509
510            let mut common_prefixes = BTreeSet::new();
511            let mut objects = Vec::new();
512
513            for entry_res in walkdir.into_iter().map(convert_walkdir_result) {
514                if let Some(entry) = entry_res? {
515                    let is_directory = entry.file_type().is_dir();
516                    let entry_location = config.filesystem_to_path(entry.path())?;
517                    if !is_directory && !is_valid_file_path(&entry_location) {
518                        continue;
519                    }
520
521                    let mut parts = match entry_location.prefix_match(&prefix) {
522                        Some(parts) => parts,
523                        None => continue,
524                    };
525
526                    let common_prefix = match parts.next() {
527                        Some(p) => p,
528                        None => continue,
529                    };
530
531                    drop(parts);
532
533                    if is_directory {
534                        common_prefixes.insert(prefix.child(common_prefix));
535                    } else if let Some(metadata) = convert_entry(entry, entry_location)? {
536                        objects.push(metadata);
537                    }
538                }
539            }
540
541            Ok(ListResult {
542                common_prefixes: common_prefixes.into_iter().collect(),
543                objects,
544            })
545        })
546        .await
547    }
548
549    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
550        let from = self.path_to_filesystem(from)?;
551        let to = self.path_to_filesystem(to)?;
552        let mut id = 0;
553        // In order to make this atomic we:
554        //
555        // - hard link to a hidden temporary file
556        // - atomically rename this temporary file into place
557        //
558        // This is necessary because hard_link returns an error if the destination already exists
559        maybe_spawn_blocking(move || loop {
560            let staged = staged_upload_path(&to, &id.to_string());
561            match std::fs::hard_link(&from, &staged) {
562                Ok(_) => {
563                    return std::fs::rename(&staged, &to).map_err(|source| {
564                        let _ = std::fs::remove_file(&staged); // Attempt to clean up
565                        Error::UnableToCopyFile { from, to, source }.into()
566                    });
567                }
568                Err(source) => match source.kind() {
569                    ErrorKind::AlreadyExists => id += 1,
570                    ErrorKind::NotFound => match from.exists() {
571                        true => create_parent_dirs(&to, source)?,
572                        false => return Err(Error::NotFound { path: from, source }.into()),
573                    },
574                    _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
575                },
576            }
577        })
578        .await
579    }
580
581    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
582        let from = self.path_to_filesystem(from)?;
583        let to = self.path_to_filesystem(to)?;
584        maybe_spawn_blocking(move || loop {
585            match std::fs::rename(&from, &to) {
586                Ok(_) => return Ok(()),
587                Err(source) => match source.kind() {
588                    ErrorKind::NotFound => match from.exists() {
589                        true => create_parent_dirs(&to, source)?,
590                        false => return Err(Error::NotFound { path: from, source }.into()),
591                    },
592                    _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
593                },
594            }
595        })
596        .await
597    }
598
599    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
600        let from = self.path_to_filesystem(from)?;
601        let to = self.path_to_filesystem(to)?;
602
603        maybe_spawn_blocking(move || loop {
604            match std::fs::hard_link(&from, &to) {
605                Ok(_) => return Ok(()),
606                Err(source) => match source.kind() {
607                    ErrorKind::AlreadyExists => {
608                        return Err(Error::AlreadyExists {
609                            path: to.to_str().unwrap().to_string(),
610                            source,
611                        }
612                        .into())
613                    }
614                    ErrorKind::NotFound => match from.exists() {
615                        true => create_parent_dirs(&to, source)?,
616                        false => return Err(Error::NotFound { path: from, source }.into()),
617                    },
618                    _ => return Err(Error::UnableToCopyFile { from, to, source }.into()),
619                },
620            }
621        })
622        .await
623    }
624}
625
626impl LocalFileSystem {
627    fn list_with_maybe_offset(
628        &self,
629        prefix: Option<&Path>,
630        maybe_offset: Option<&Path>,
631    ) -> BoxStream<'static, Result<ObjectMeta>> {
632        let config = Arc::clone(&self.config);
633
634        let root_path = match prefix {
635            Some(prefix) => match config.prefix_to_filesystem(prefix) {
636                Ok(path) => path,
637                Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(),
638            },
639            None => config.root.to_file_path().unwrap(),
640        };
641
642        let walkdir = WalkDir::new(root_path)
643            // Don't include the root directory itself
644            .min_depth(1)
645            .follow_links(true);
646
647        let maybe_offset = maybe_offset.cloned();
648
649        let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
650            // Apply offset filter before proceeding, to reduce statx file system calls
651            // This matters for NFS mounts
652            if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(), result_dir_entry.as_ref()) {
653                let location = config.filesystem_to_path(entry.path());
654                match location {
655                    Ok(path) if path <= *offset => return None,
656                    Err(e) => return Some(Err(e)),
657                    _ => {}
658                }
659            }
660
661            let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
662                Ok(entry) => entry,
663                Err(e) => return Some(Err(e)),
664            };
665
666            if !entry.path().is_file() {
667                return None;
668            }
669
670            match config.filesystem_to_path(entry.path()) {
671                Ok(path) => match is_valid_file_path(&path) {
672                    true => convert_entry(entry, path).transpose(),
673                    false => None,
674                },
675                Err(e) => Some(Err(e)),
676            }
677        });
678
679        // If no tokio context, return iterator directly as no
680        // need to perform chunked spawn_blocking reads
681        if tokio::runtime::Handle::try_current().is_err() {
682            return futures::stream::iter(s).boxed();
683        }
684
685        // Otherwise list in batches of CHUNK_SIZE
686        const CHUNK_SIZE: usize = 1024;
687
688        let buffer = VecDeque::with_capacity(CHUNK_SIZE);
689        futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
690            if buffer.is_empty() {
691                (s, buffer) = tokio::task::spawn_blocking(move || {
692                    for _ in 0..CHUNK_SIZE {
693                        match s.next() {
694                            Some(r) => buffer.push_back(r),
695                            None => break,
696                        }
697                    }
698                    (s, buffer)
699                })
700                .await?;
701            }
702
703            match buffer.pop_front() {
704                Some(Err(e)) => Err(e),
705                Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
706                None => Ok(None),
707            }
708        })
709        .boxed()
710    }
711}
712
713/// Creates the parent directories of `path` or returns an error based on `source` if no parent
714fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
715    let parent = path.parent().ok_or_else(|| {
716        let path = path.to_path_buf();
717        Error::UnableToCreateFile { path, source }
718    })?;
719
720    std::fs::create_dir_all(parent).map_err(|source| {
721        let path = parent.into();
722        Error::UnableToCreateDir { source, path }
723    })?;
724    Ok(())
725}
726
727/// Generates a unique file path `{base}#{suffix}`, returning the opened `File` and `path`
728///
729/// Creates any directories if necessary
730fn new_staged_upload(base: &std::path::Path) -> Result<(File, PathBuf)> {
731    let mut multipart_id = 1;
732    loop {
733        let suffix = multipart_id.to_string();
734        let path = staged_upload_path(base, &suffix);
735        let mut options = OpenOptions::new();
736        match options.read(true).write(true).create_new(true).open(&path) {
737            Ok(f) => return Ok((f, path)),
738            Err(source) => match source.kind() {
739                ErrorKind::AlreadyExists => multipart_id += 1,
740                ErrorKind::NotFound => create_parent_dirs(&path, source)?,
741                _ => return Err(Error::UnableToOpenFile { source, path }.into()),
742            },
743        }
744    }
745}
746
747/// Returns the unique upload for the given path and suffix
748fn staged_upload_path(dest: &std::path::Path, suffix: &str) -> PathBuf {
749    let mut staging_path = dest.as_os_str().to_owned();
750    staging_path.push("#");
751    staging_path.push(suffix);
752    staging_path.into()
753}
754
755#[derive(Debug)]
756struct LocalUpload {
757    /// The upload state
758    state: Arc<UploadState>,
759    /// The location of the temporary file
760    src: Option<PathBuf>,
761    /// The next offset to write into the file
762    offset: u64,
763}
764
765#[derive(Debug)]
766struct UploadState {
767    dest: PathBuf,
768    file: Mutex<File>,
769}
770
771impl LocalUpload {
772    pub(crate) fn new(src: PathBuf, dest: PathBuf, file: File) -> Self {
773        Self {
774            state: Arc::new(UploadState {
775                dest,
776                file: Mutex::new(file),
777            }),
778            src: Some(src),
779            offset: 0,
780        }
781    }
782}
783
784#[async_trait]
785impl MultipartUpload for LocalUpload {
786    fn put_part(&mut self, data: PutPayload) -> UploadPart {
787        let offset = self.offset;
788        self.offset += data.content_length() as u64;
789
790        let s = Arc::clone(&self.state);
791        maybe_spawn_blocking(move || {
792            let mut file = s.file.lock();
793            file.seek(SeekFrom::Start(offset)).map_err(|source| {
794                let path = s.dest.clone();
795                Error::Seek { source, path }
796            })?;
797
798            data.iter()
799                .try_for_each(|x| file.write_all(x))
800                .map_err(|source| Error::UnableToCopyDataToFile { source })?;
801
802            Ok(())
803        })
804        .boxed()
805    }
806
807    async fn complete(&mut self) -> Result<PutResult> {
808        let src = self.src.take().ok_or(Error::Aborted)?;
809        let s = Arc::clone(&self.state);
810        maybe_spawn_blocking(move || {
811            // Ensure no inflight writes
812            let file = s.file.lock();
813            std::fs::rename(&src, &s.dest)
814                .map_err(|source| Error::UnableToRenameFile { source })?;
815            let metadata = file.metadata().map_err(|e| Error::Metadata {
816                source: e.into(),
817                path: src.to_string_lossy().to_string(),
818            })?;
819
820            Ok(PutResult {
821                e_tag: Some(get_etag(&metadata)),
822                version: None,
823            })
824        })
825        .await
826    }
827
828    async fn abort(&mut self) -> Result<()> {
829        let src = self.src.take().ok_or(Error::Aborted)?;
830        maybe_spawn_blocking(move || {
831            std::fs::remove_file(&src)
832                .map_err(|source| Error::UnableToDeleteFile { source, path: src })?;
833            Ok(())
834        })
835        .await
836    }
837}
838
839impl Drop for LocalUpload {
840    fn drop(&mut self) {
841        if let Some(src) = self.src.take() {
842            // Try to clean up intermediate file ignoring any error
843            match tokio::runtime::Handle::try_current() {
844                Ok(r) => drop(r.spawn_blocking(move || std::fs::remove_file(src))),
845                Err(_) => drop(std::fs::remove_file(src)),
846            };
847        }
848    }
849}
850
851pub(crate) fn chunked_stream(
852    mut file: File,
853    path: PathBuf,
854    range: Range<u64>,
855    chunk_size: usize,
856) -> BoxStream<'static, Result<Bytes, super::Error>> {
857    futures::stream::once(async move {
858        let (file, path) = maybe_spawn_blocking(move || {
859            file.seek(SeekFrom::Start(range.start as _))
860                .map_err(|source| Error::Seek {
861                    source,
862                    path: path.clone(),
863                })?;
864            Ok((file, path))
865        })
866        .await?;
867
868        let stream = futures::stream::try_unfold(
869            (file, path, range.end - range.start),
870            move |(mut file, path, remaining)| {
871                maybe_spawn_blocking(move || {
872                    if remaining == 0 {
873                        return Ok(None);
874                    }
875
876                    let to_read = remaining.min(chunk_size as u64);
877                    let cap = usize::try_from(to_read).map_err(|_e| Error::InvalidRange {
878                        source: InvalidGetRange::TooLarge {
879                            requested: to_read,
880                            max: usize::MAX as u64,
881                        },
882                    })?;
883                    let mut buffer = Vec::with_capacity(cap);
884                    let read = (&mut file)
885                        .take(to_read)
886                        .read_to_end(&mut buffer)
887                        .map_err(|e| Error::UnableToReadBytes {
888                            source: e,
889                            path: path.clone(),
890                        })?;
891
892                    Ok(Some((buffer.into(), (file, path, remaining - read as u64))))
893                })
894            },
895        );
896        Ok::<_, super::Error>(stream)
897    })
898    .try_flatten()
899    .boxed()
900}
901
902pub(crate) fn read_range(
903    file: &mut File,
904    file_len: u64,
905    path: &PathBuf,
906    range: Range<u64>,
907) -> Result<Bytes> {
908    // If none of the range is satisfiable we should error, e.g. if the start offset is beyond the
909    // extents of the file
910    if range.start >= file_len {
911        return Err(Error::InvalidRange {
912            source: InvalidGetRange::StartTooLarge {
913                requested: range.start,
914                length: file_len,
915            },
916        }
917        .into());
918    }
919
920    // Don't read past end of file
921    let to_read = range.end.min(file_len) - range.start;
922
923    file.seek(SeekFrom::Start(range.start)).map_err(|source| {
924        let path = path.into();
925        Error::Seek { source, path }
926    })?;
927
928    let mut buf = Vec::with_capacity(to_read as usize);
929    let read = file.take(to_read).read_to_end(&mut buf).map_err(|source| {
930        let path = path.into();
931        Error::UnableToReadBytes { source, path }
932    })? as u64;
933
934    if read != to_read {
935        let error = Error::OutOfRange {
936            path: path.into(),
937            expected: to_read,
938            actual: read,
939        };
940
941        return Err(error.into());
942    }
943
944    Ok(buf.into())
945}
946
947fn open_file(path: &PathBuf) -> Result<(File, Metadata)> {
948    let ret = match File::open(path).and_then(|f| Ok((f.metadata()?, f))) {
949        Err(e) => Err(match e.kind() {
950            ErrorKind::NotFound => Error::NotFound {
951                path: path.clone(),
952                source: e,
953            },
954            _ => Error::UnableToOpenFile {
955                path: path.clone(),
956                source: e,
957            },
958        }),
959        Ok((metadata, file)) => match !metadata.is_dir() {
960            true => Ok((file, metadata)),
961            false => Err(Error::NotFound {
962                path: path.clone(),
963                source: io::Error::new(ErrorKind::NotFound, "is directory"),
964            }),
965        },
966    }?;
967    Ok(ret)
968}
969
970fn convert_entry(entry: DirEntry, location: Path) -> Result<Option<ObjectMeta>> {
971    match entry.metadata() {
972        Ok(metadata) => Ok(Some(convert_metadata(metadata, location))),
973        Err(e) => {
974            if let Some(io_err) = e.io_error() {
975                if io_err.kind() == ErrorKind::NotFound {
976                    return Ok(None);
977                }
978            }
979            Err(Error::Metadata {
980                source: e.into(),
981                path: location.to_string(),
982            })?
983        }
984    }
985}
986
987fn last_modified(metadata: &Metadata) -> DateTime<Utc> {
988    metadata
989        .modified()
990        .expect("Modified file time should be supported on this platform")
991        .into()
992}
993
994fn get_etag(metadata: &Metadata) -> String {
995    let inode = get_inode(metadata);
996    let size = metadata.len();
997    let mtime = metadata
998        .modified()
999        .ok()
1000        .and_then(|mtime| mtime.duration_since(SystemTime::UNIX_EPOCH).ok())
1001        .unwrap_or_default()
1002        .as_micros();
1003
1004    // Use an ETag scheme based on that used by many popular HTTP servers
1005    // <https://httpd.apache.org/docs/2.2/mod/core.html#fileetag>
1006    // <https://stackoverflow.com/questions/47512043/how-etags-are-generated-and-configured>
1007    format!("{inode:x}-{mtime:x}-{size:x}")
1008}
1009
1010fn convert_metadata(metadata: Metadata, location: Path) -> ObjectMeta {
1011    let last_modified = last_modified(&metadata);
1012
1013    ObjectMeta {
1014        location,
1015        last_modified,
1016        size: metadata.len(),
1017        e_tag: Some(get_etag(&metadata)),
1018        version: None,
1019    }
1020}
1021
1022#[cfg(unix)]
1023/// We include the inode when available to yield an ETag more resistant to collisions
1024/// and as used by popular web servers such as [Apache](https://httpd.apache.org/docs/2.2/mod/core.html#fileetag)
1025fn get_inode(metadata: &Metadata) -> u64 {
1026    std::os::unix::fs::MetadataExt::ino(metadata)
1027}
1028
1029#[cfg(not(unix))]
1030/// On platforms where an inode isn't available, fallback to just relying on size and mtime
1031fn get_inode(_metadata: &Metadata) -> u64 {
1032    0
1033}
1034
1035/// Convert walkdir results and converts not-found errors into `None`.
1036/// Convert broken symlinks to `None`.
1037fn convert_walkdir_result(
1038    res: std::result::Result<DirEntry, walkdir::Error>,
1039) -> Result<Option<DirEntry>> {
1040    match res {
1041        Ok(entry) => {
1042            // To check for broken symlink: call symlink_metadata() - it does not traverse symlinks);
1043            // if ok: check if entry is symlink; and try to read it by calling metadata().
1044            match symlink_metadata(entry.path()) {
1045                Ok(attr) => {
1046                    if attr.is_symlink() {
1047                        let target_metadata = metadata(entry.path());
1048                        match target_metadata {
1049                            Ok(_) => {
1050                                // symlink is valid
1051                                Ok(Some(entry))
1052                            }
1053                            Err(_) => {
1054                                // this is a broken symlink, return None
1055                                Ok(None)
1056                            }
1057                        }
1058                    } else {
1059                        Ok(Some(entry))
1060                    }
1061                }
1062                Err(_) => Ok(None),
1063            }
1064        }
1065
1066        Err(walkdir_err) => match walkdir_err.io_error() {
1067            Some(io_err) => match io_err.kind() {
1068                ErrorKind::NotFound => Ok(None),
1069                _ => Err(Error::UnableToWalkDir {
1070                    source: walkdir_err,
1071                }
1072                .into()),
1073            },
1074            None => Err(Error::UnableToWalkDir {
1075                source: walkdir_err,
1076            }
1077            .into()),
1078        },
1079    }
1080}
1081
1082#[cfg(test)]
1083mod tests {
1084    use std::fs;
1085
1086    use futures::TryStreamExt;
1087    use tempfile::TempDir;
1088
1089    #[cfg(target_family = "unix")]
1090    use tempfile::NamedTempFile;
1091
1092    use crate::integration::*;
1093
1094    use super::*;
1095
1096    #[tokio::test]
1097    #[cfg(target_family = "unix")]
1098    async fn file_test() {
1099        let root = TempDir::new().unwrap();
1100        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1101
1102        put_get_delete_list(&integration).await;
1103        get_opts(&integration).await;
1104        list_uses_directories_correctly(&integration).await;
1105        list_with_delimiter(&integration).await;
1106        rename_and_copy(&integration).await;
1107        copy_if_not_exists(&integration).await;
1108        copy_rename_nonexistent_object(&integration).await;
1109        stream_get(&integration).await;
1110        put_opts(&integration, false).await;
1111    }
1112
1113    #[test]
1114    #[cfg(target_family = "unix")]
1115    fn test_non_tokio() {
1116        let root = TempDir::new().unwrap();
1117        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1118        futures::executor::block_on(async move {
1119            put_get_delete_list(&integration).await;
1120            list_uses_directories_correctly(&integration).await;
1121            list_with_delimiter(&integration).await;
1122
1123            // Can't use stream_get test as WriteMultipart uses a tokio JoinSet
1124            let p = Path::from("manual_upload");
1125            let mut upload = integration.put_multipart(&p).await.unwrap();
1126            upload.put_part("123".into()).await.unwrap();
1127            upload.put_part("45678".into()).await.unwrap();
1128            let r = upload.complete().await.unwrap();
1129
1130            let get = integration.get(&p).await.unwrap();
1131            assert_eq!(get.meta.e_tag.as_ref().unwrap(), r.e_tag.as_ref().unwrap());
1132            let actual = get.bytes().await.unwrap();
1133            assert_eq!(actual.as_ref(), b"12345678");
1134        });
1135    }
1136
1137    #[tokio::test]
1138    async fn creates_dir_if_not_present() {
1139        let root = TempDir::new().unwrap();
1140        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1141
1142        let location = Path::from("nested/file/test_file");
1143
1144        let data = Bytes::from("arbitrary data");
1145
1146        integration
1147            .put(&location, data.clone().into())
1148            .await
1149            .unwrap();
1150
1151        let read_data = integration
1152            .get(&location)
1153            .await
1154            .unwrap()
1155            .bytes()
1156            .await
1157            .unwrap();
1158        assert_eq!(&*read_data, data);
1159    }
1160
1161    #[tokio::test]
1162    async fn unknown_length() {
1163        let root = TempDir::new().unwrap();
1164        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1165
1166        let location = Path::from("some_file");
1167
1168        let data = Bytes::from("arbitrary data");
1169
1170        integration
1171            .put(&location, data.clone().into())
1172            .await
1173            .unwrap();
1174
1175        let read_data = integration
1176            .get(&location)
1177            .await
1178            .unwrap()
1179            .bytes()
1180            .await
1181            .unwrap();
1182        assert_eq!(&*read_data, data);
1183    }
1184
1185    #[tokio::test]
1186    async fn range_request_start_beyond_end_of_file() {
1187        let root = TempDir::new().unwrap();
1188        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1189
1190        let location = Path::from("some_file");
1191
1192        let data = Bytes::from("arbitrary data");
1193
1194        integration
1195            .put(&location, data.clone().into())
1196            .await
1197            .unwrap();
1198
1199        integration
1200            .get_range(&location, 100..200)
1201            .await
1202            .expect_err("Should error with start range beyond end of file");
1203    }
1204
1205    #[tokio::test]
1206    async fn range_request_beyond_end_of_file() {
1207        let root = TempDir::new().unwrap();
1208        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1209
1210        let location = Path::from("some_file");
1211
1212        let data = Bytes::from("arbitrary data");
1213
1214        integration
1215            .put(&location, data.clone().into())
1216            .await
1217            .unwrap();
1218
1219        let read_data = integration.get_range(&location, 0..100).await.unwrap();
1220        assert_eq!(&*read_data, data);
1221    }
1222
1223    #[tokio::test]
1224    #[cfg(target_family = "unix")]
1225    // Fails on github actions runner (which runs the tests as root)
1226    #[ignore]
1227    async fn bubble_up_io_errors() {
1228        use std::{fs::set_permissions, os::unix::prelude::PermissionsExt};
1229
1230        let root = TempDir::new().unwrap();
1231
1232        // make non-readable
1233        let metadata = root.path().metadata().unwrap();
1234        let mut permissions = metadata.permissions();
1235        permissions.set_mode(0o000);
1236        set_permissions(root.path(), permissions).unwrap();
1237
1238        let store = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1239
1240        let mut stream = store.list(None);
1241        let mut any_err = false;
1242        while let Some(res) = stream.next().await {
1243            if res.is_err() {
1244                any_err = true;
1245            }
1246        }
1247        assert!(any_err);
1248
1249        // `list_with_delimiter
1250        assert!(store.list_with_delimiter(None).await.is_err());
1251    }
1252
1253    const NON_EXISTENT_NAME: &str = "nonexistentname";
1254
1255    #[tokio::test]
1256    async fn get_nonexistent_location() {
1257        let root = TempDir::new().unwrap();
1258        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1259
1260        let location = Path::from(NON_EXISTENT_NAME);
1261
1262        let err = get_nonexistent_object(&integration, Some(location))
1263            .await
1264            .unwrap_err();
1265        if let crate::Error::NotFound { path, source } = err {
1266            let source_variant = source.downcast_ref::<std::io::Error>();
1267            assert!(
1268                matches!(source_variant, Some(std::io::Error { .. }),),
1269                "got: {source_variant:?}"
1270            );
1271            assert!(path.ends_with(NON_EXISTENT_NAME), "{}", path);
1272        } else {
1273            panic!("unexpected error type: {err:?}");
1274        }
1275    }
1276
1277    #[tokio::test]
1278    async fn root() {
1279        let integration = LocalFileSystem::new();
1280
1281        let canonical = std::path::Path::new("Cargo.toml").canonicalize().unwrap();
1282        let url = Url::from_directory_path(&canonical).unwrap();
1283        let path = Path::parse(url.path()).unwrap();
1284
1285        let roundtrip = integration.path_to_filesystem(&path).unwrap();
1286
1287        // Needed as on Windows canonicalize returns extended length path syntax
1288        // C:\Users\circleci -> \\?\C:\Users\circleci
1289        let roundtrip = roundtrip.canonicalize().unwrap();
1290
1291        assert_eq!(roundtrip, canonical);
1292
1293        integration.head(&path).await.unwrap();
1294    }
1295
1296    #[tokio::test]
1297    #[cfg(target_family = "windows")]
1298    async fn test_list_root() {
1299        let fs = LocalFileSystem::new();
1300        let r = fs.list_with_delimiter(None).await.unwrap_err().to_string();
1301
1302        assert!(
1303            r.contains("Unable to convert URL \"file:///\" to filesystem path"),
1304            "{}",
1305            r
1306        );
1307    }
1308
1309    #[tokio::test]
1310    #[cfg(target_os = "linux")]
1311    async fn test_list_root() {
1312        let fs = LocalFileSystem::new();
1313        fs.list_with_delimiter(None).await.unwrap();
1314    }
1315
1316    #[cfg(target_family = "unix")]
1317    async fn check_list(integration: &LocalFileSystem, prefix: Option<&Path>, expected: &[&str]) {
1318        let result: Vec<_> = integration.list(prefix).try_collect().await.unwrap();
1319
1320        let mut strings: Vec<_> = result.iter().map(|x| x.location.as_ref()).collect();
1321        strings.sort_unstable();
1322        assert_eq!(&strings, expected)
1323    }
1324
1325    #[tokio::test]
1326    #[cfg(target_family = "unix")]
1327    async fn test_symlink() {
1328        let root = TempDir::new().unwrap();
1329        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1330
1331        let subdir = root.path().join("a");
1332        std::fs::create_dir(&subdir).unwrap();
1333        let file = subdir.join("file.parquet");
1334        std::fs::write(file, "test").unwrap();
1335
1336        check_list(&integration, None, &["a/file.parquet"]).await;
1337        integration
1338            .head(&Path::from("a/file.parquet"))
1339            .await
1340            .unwrap();
1341
1342        // Follow out of tree symlink
1343        let other = NamedTempFile::new().unwrap();
1344        std::os::unix::fs::symlink(other.path(), root.path().join("test.parquet")).unwrap();
1345
1346        // Should return test.parquet even though out of tree
1347        check_list(&integration, None, &["a/file.parquet", "test.parquet"]).await;
1348
1349        // Can fetch test.parquet
1350        integration.head(&Path::from("test.parquet")).await.unwrap();
1351
1352        // Follow in tree symlink
1353        std::os::unix::fs::symlink(&subdir, root.path().join("b")).unwrap();
1354        check_list(
1355            &integration,
1356            None,
1357            &["a/file.parquet", "b/file.parquet", "test.parquet"],
1358        )
1359        .await;
1360        check_list(&integration, Some(&Path::from("b")), &["b/file.parquet"]).await;
1361
1362        // Can fetch through symlink
1363        integration
1364            .head(&Path::from("b/file.parquet"))
1365            .await
1366            .unwrap();
1367
1368        // Ignore broken symlink
1369        std::os::unix::fs::symlink(root.path().join("foo.parquet"), root.path().join("c")).unwrap();
1370
1371        check_list(
1372            &integration,
1373            None,
1374            &["a/file.parquet", "b/file.parquet", "test.parquet"],
1375        )
1376        .await;
1377
1378        let mut r = integration.list_with_delimiter(None).await.unwrap();
1379        r.common_prefixes.sort_unstable();
1380        assert_eq!(r.common_prefixes.len(), 2);
1381        assert_eq!(r.common_prefixes[0].as_ref(), "a");
1382        assert_eq!(r.common_prefixes[1].as_ref(), "b");
1383        assert_eq!(r.objects.len(), 1);
1384        assert_eq!(r.objects[0].location.as_ref(), "test.parquet");
1385
1386        let r = integration
1387            .list_with_delimiter(Some(&Path::from("a")))
1388            .await
1389            .unwrap();
1390        assert_eq!(r.common_prefixes.len(), 0);
1391        assert_eq!(r.objects.len(), 1);
1392        assert_eq!(r.objects[0].location.as_ref(), "a/file.parquet");
1393
1394        // Deleting a symlink doesn't delete the source file
1395        integration
1396            .delete(&Path::from("test.parquet"))
1397            .await
1398            .unwrap();
1399        assert!(other.path().exists());
1400
1401        check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1402
1403        // Deleting through a symlink deletes both files
1404        integration
1405            .delete(&Path::from("b/file.parquet"))
1406            .await
1407            .unwrap();
1408
1409        check_list(&integration, None, &[]).await;
1410
1411        // Adding a file through a symlink creates in both paths
1412        integration
1413            .put(&Path::from("b/file.parquet"), vec![0, 1, 2].into())
1414            .await
1415            .unwrap();
1416
1417        check_list(&integration, None, &["a/file.parquet", "b/file.parquet"]).await;
1418    }
1419
1420    #[tokio::test]
1421    async fn invalid_path() {
1422        let root = TempDir::new().unwrap();
1423        let root = root.path().join("🙀");
1424        std::fs::create_dir(root.clone()).unwrap();
1425
1426        // Invalid paths supported above root of store
1427        let integration = LocalFileSystem::new_with_prefix(root.clone()).unwrap();
1428
1429        let directory = Path::from("directory");
1430        let object = directory.child("child.txt");
1431        let data = Bytes::from("arbitrary");
1432        integration.put(&object, data.clone().into()).await.unwrap();
1433        integration.head(&object).await.unwrap();
1434        let result = integration.get(&object).await.unwrap();
1435        assert_eq!(result.bytes().await.unwrap(), data);
1436
1437        flatten_list_stream(&integration, None).await.unwrap();
1438        flatten_list_stream(&integration, Some(&directory))
1439            .await
1440            .unwrap();
1441
1442        let result = integration
1443            .list_with_delimiter(Some(&directory))
1444            .await
1445            .unwrap();
1446        assert_eq!(result.objects.len(), 1);
1447        assert!(result.common_prefixes.is_empty());
1448        assert_eq!(result.objects[0].location, object);
1449
1450        let emoji = root.join("💀");
1451        std::fs::write(emoji, "foo").unwrap();
1452
1453        // Can list illegal file
1454        let mut paths = flatten_list_stream(&integration, None).await.unwrap();
1455        paths.sort_unstable();
1456
1457        assert_eq!(
1458            paths,
1459            vec![
1460                Path::parse("directory/child.txt").unwrap(),
1461                Path::parse("💀").unwrap()
1462            ]
1463        );
1464    }
1465
1466    #[tokio::test]
1467    async fn list_hides_incomplete_uploads() {
1468        let root = TempDir::new().unwrap();
1469        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1470        let location = Path::from("some_file");
1471
1472        let data = PutPayload::from("arbitrary data");
1473        let mut u1 = integration.put_multipart(&location).await.unwrap();
1474        u1.put_part(data.clone()).await.unwrap();
1475
1476        let mut u2 = integration.put_multipart(&location).await.unwrap();
1477        u2.put_part(data).await.unwrap();
1478
1479        let list = flatten_list_stream(&integration, None).await.unwrap();
1480        assert_eq!(list.len(), 0);
1481
1482        assert_eq!(
1483            integration
1484                .list_with_delimiter(None)
1485                .await
1486                .unwrap()
1487                .objects
1488                .len(),
1489            0
1490        );
1491    }
1492
1493    #[tokio::test]
1494    async fn test_path_with_offset() {
1495        let root = TempDir::new().unwrap();
1496        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1497
1498        let root_path = root.path();
1499        for i in 0..5 {
1500            let filename = format!("test{i}.parquet");
1501            let file = root_path.join(filename);
1502            std::fs::write(file, "test").unwrap();
1503        }
1504        let filter_str = "test";
1505        let filter = String::from(filter_str);
1506        let offset_str = filter + "1";
1507        let offset = Path::from(offset_str.clone());
1508
1509        // Use list_with_offset to retrieve files
1510        let res = integration.list_with_offset(None, &offset);
1511        let offset_paths: Vec<_> = res.map_ok(|x| x.location).try_collect().await.unwrap();
1512        let mut offset_files: Vec<_> = offset_paths
1513            .iter()
1514            .map(|x| String::from(x.filename().unwrap()))
1515            .collect();
1516
1517        // Check result with direct filesystem read
1518        let files = fs::read_dir(root_path).unwrap();
1519        let filtered_files = files
1520            .filter_map(Result::ok)
1521            .filter_map(|d| {
1522                d.file_name().to_str().and_then(|f| {
1523                    if f.contains(filter_str) {
1524                        Some(String::from(f))
1525                    } else {
1526                        None
1527                    }
1528                })
1529            })
1530            .collect::<Vec<_>>();
1531
1532        let mut expected_offset_files: Vec<_> = filtered_files
1533            .iter()
1534            .filter(|s| **s > offset_str)
1535            .cloned()
1536            .collect();
1537
1538        fn do_vecs_match<T: PartialEq>(a: &[T], b: &[T]) -> bool {
1539            let matching = a.iter().zip(b.iter()).filter(|&(a, b)| a == b).count();
1540            matching == a.len() && matching == b.len()
1541        }
1542
1543        offset_files.sort();
1544        expected_offset_files.sort();
1545
1546        // println!("Expected Offset Files: {:?}", expected_offset_files);
1547        // println!("Actual Offset Files: {:?}", offset_files);
1548
1549        assert_eq!(offset_files.len(), expected_offset_files.len());
1550        assert!(do_vecs_match(&expected_offset_files, &offset_files));
1551    }
1552
1553    #[tokio::test]
1554    async fn filesystem_filename_with_percent() {
1555        let temp_dir = TempDir::new().unwrap();
1556        let integration = LocalFileSystem::new_with_prefix(temp_dir.path()).unwrap();
1557        let filename = "L%3ABC.parquet";
1558
1559        std::fs::write(temp_dir.path().join(filename), "foo").unwrap();
1560
1561        let res: Vec<_> = integration.list(None).try_collect().await.unwrap();
1562        assert_eq!(res.len(), 1);
1563        assert_eq!(res[0].location.as_ref(), filename);
1564
1565        let res = integration.list_with_delimiter(None).await.unwrap();
1566        assert_eq!(res.objects.len(), 1);
1567        assert_eq!(res.objects[0].location.as_ref(), filename);
1568    }
1569
1570    #[tokio::test]
1571    async fn relative_paths() {
1572        LocalFileSystem::new_with_prefix(".").unwrap();
1573        LocalFileSystem::new_with_prefix("..").unwrap();
1574        LocalFileSystem::new_with_prefix("../..").unwrap();
1575
1576        let integration = LocalFileSystem::new();
1577        let path = Path::from_filesystem_path(".").unwrap();
1578        integration.list_with_delimiter(Some(&path)).await.unwrap();
1579    }
1580
1581    #[test]
1582    fn test_valid_path() {
1583        let cases = [
1584            ("foo#123/test.txt", true),
1585            ("foo#123/test#23.txt", true),
1586            ("foo#123/test#34", false),
1587            ("foo😁/test#34", false),
1588            ("foo/test#😁34", true),
1589        ];
1590
1591        for (case, expected) in cases {
1592            let path = Path::parse(case).unwrap();
1593            assert_eq!(is_valid_file_path(&path), expected);
1594        }
1595    }
1596
1597    #[tokio::test]
1598    async fn test_intermediate_files() {
1599        let root = TempDir::new().unwrap();
1600        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1601
1602        let a = Path::parse("foo#123/test.txt").unwrap();
1603        integration.put(&a, "test".into()).await.unwrap();
1604
1605        let list = flatten_list_stream(&integration, None).await.unwrap();
1606        assert_eq!(list, vec![a.clone()]);
1607
1608        std::fs::write(root.path().join("bar#123"), "test").unwrap();
1609
1610        // Should ignore file
1611        let list = flatten_list_stream(&integration, None).await.unwrap();
1612        assert_eq!(list, vec![a.clone()]);
1613
1614        let b = Path::parse("bar#123").unwrap();
1615        let err = integration.get(&b).await.unwrap_err().to_string();
1616        assert_eq!(err, "Generic LocalFileSystem error: Filenames containing trailing '/#\\d+/' are not supported: bar#123");
1617
1618        let c = Path::parse("foo#123.txt").unwrap();
1619        integration.put(&c, "test".into()).await.unwrap();
1620
1621        let mut list = flatten_list_stream(&integration, None).await.unwrap();
1622        list.sort_unstable();
1623        assert_eq!(list, vec![c, a]);
1624    }
1625
1626    #[tokio::test]
1627    #[cfg(target_os = "windows")]
1628    async fn filesystem_filename_with_colon() {
1629        let root = TempDir::new().unwrap();
1630        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1631        let path = Path::parse("file%3Aname.parquet").unwrap();
1632        let location = Path::parse("file:name.parquet").unwrap();
1633
1634        integration.put(&location, "test".into()).await.unwrap();
1635        let list = flatten_list_stream(&integration, None).await.unwrap();
1636        assert_eq!(list, vec![path.clone()]);
1637
1638        let result = integration
1639            .get(&location)
1640            .await
1641            .unwrap()
1642            .bytes()
1643            .await
1644            .unwrap();
1645        assert_eq!(result, Bytes::from("test"));
1646    }
1647
1648    #[tokio::test]
1649    async fn delete_dirs_automatically() {
1650        let root = TempDir::new().unwrap();
1651        let integration = LocalFileSystem::new_with_prefix(root.path())
1652            .unwrap()
1653            .with_automatic_cleanup(true);
1654        let location = Path::from("nested/file/test_file");
1655        let data = Bytes::from("arbitrary data");
1656
1657        integration
1658            .put(&location, data.clone().into())
1659            .await
1660            .unwrap();
1661
1662        let read_data = integration
1663            .get(&location)
1664            .await
1665            .unwrap()
1666            .bytes()
1667            .await
1668            .unwrap();
1669
1670        assert_eq!(&*read_data, data);
1671        assert!(fs::read_dir(root.path()).unwrap().count() > 0);
1672        integration.delete(&location).await.unwrap();
1673        assert!(fs::read_dir(root.path()).unwrap().count() == 0);
1674    }
1675}
1676
1677#[cfg(not(target_arch = "wasm32"))]
1678#[cfg(test)]
1679mod not_wasm_tests {
1680    use std::time::Duration;
1681    use tempfile::TempDir;
1682
1683    use crate::local::LocalFileSystem;
1684    use crate::{ObjectStore, Path, PutPayload};
1685
1686    #[tokio::test]
1687    async fn test_cleanup_intermediate_files() {
1688        let root = TempDir::new().unwrap();
1689        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1690
1691        let location = Path::from("some_file");
1692        let data = PutPayload::from_static(b"hello");
1693        let mut upload = integration.put_multipart(&location).await.unwrap();
1694        upload.put_part(data).await.unwrap();
1695
1696        let file_count = std::fs::read_dir(root.path()).unwrap().count();
1697        assert_eq!(file_count, 1);
1698        drop(upload);
1699
1700        for _ in 0..100 {
1701            tokio::time::sleep(Duration::from_millis(1)).await;
1702            let file_count = std::fs::read_dir(root.path()).unwrap().count();
1703            if file_count == 0 {
1704                return;
1705            }
1706        }
1707        panic!("Failed to cleanup file in 100ms")
1708    }
1709}
1710
1711#[cfg(target_family = "unix")]
1712#[cfg(test)]
1713mod unix_test {
1714    use std::fs::OpenOptions;
1715
1716    use nix::sys::stat;
1717    use nix::unistd;
1718    use tempfile::TempDir;
1719
1720    use crate::local::LocalFileSystem;
1721    use crate::{ObjectStore, Path};
1722
1723    #[tokio::test]
1724    async fn test_fifo() {
1725        let filename = "some_file";
1726        let root = TempDir::new().unwrap();
1727        let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();
1728        let path = root.path().join(filename);
1729        unistd::mkfifo(&path, stat::Mode::S_IRWXU).unwrap();
1730
1731        // Need to open read and write side in parallel
1732        let spawned =
1733            tokio::task::spawn_blocking(|| OpenOptions::new().write(true).open(path).unwrap());
1734
1735        let location = Path::from(filename);
1736        integration.head(&location).await.unwrap();
1737        integration.get(&location).await.unwrap();
1738
1739        spawned.await.unwrap();
1740    }
1741}