datafusion_data_access/object_store/
local.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Object store that represents the Local File System.
19
20use std::fs::{self, File, Metadata};
21use std::io;
22use std::io::{BufReader, Read, Seek, SeekFrom};
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use futures::{stream, AsyncRead, StreamExt, TryStreamExt};
27
28use crate::{FileMeta, ListEntry, Result, SizedFile};
29
30use super::{
31    FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore,
32};
33
34pub static LOCAL_SCHEME: &str = "file";
35
36#[derive(Debug)]
37/// Local File System as Object Store.
38pub struct LocalFileSystem;
39
40#[async_trait]
41impl ObjectStore for LocalFileSystem {
42    async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
43        let prefix = if let Some((_scheme, path)) = prefix.split_once("://") {
44            path
45        } else {
46            prefix
47        };
48        list_all(prefix.to_owned()).await
49    }
50
51    async fn list_dir(
52        &self,
53        prefix: &str,
54        delimiter: Option<String>,
55    ) -> Result<ListEntryStream> {
56        if let Some(d) = delimiter {
57            if d != "/" && d != "\\" {
58                return Err(std::io::Error::new(
59                    std::io::ErrorKind::InvalidInput,
60                    format!("delimiter not supported on local filesystem: {}", d),
61                ));
62            }
63            let mut entry_stream = tokio::fs::read_dir(prefix).await?;
64
65            let list_entries = stream::poll_fn(move |cx| {
66                entry_stream.poll_next_entry(cx).map(|res| match res {
67                    Ok(Some(x)) => Some(Ok(x)),
68                    Ok(None) => None,
69                    Err(err) => Some(Err(err)),
70                })
71            })
72            .then(|entry| async {
73                let entry = entry?;
74                let entry = if entry.file_type().await?.is_dir() {
75                    ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
76                } else {
77                    ListEntry::FileMeta(get_meta(
78                        path_as_str(&entry.path())?.to_string(),
79                        entry.metadata().await?,
80                    ))
81                };
82                Ok(entry)
83            });
84
85            Ok(Box::pin(list_entries))
86        } else {
87            Ok(Box::pin(
88                self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
89            ))
90        }
91    }
92
93    fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
94        Ok(Arc::new(LocalFileReader::new(file)?))
95    }
96}
97
98/// Try to convert a PathBuf reference into a &str
99pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
100    path.to_str().ok_or_else(|| {
101        io::Error::new(
102            io::ErrorKind::InvalidInput,
103            format!("Invalid path '{}'", path.display()),
104        )
105    })
106}
107
108struct LocalFileReader {
109    file: SizedFile,
110}
111
112impl LocalFileReader {
113    fn new(file: SizedFile) -> Result<Self> {
114        Ok(Self { file })
115    }
116}
117
118#[async_trait]
119impl ObjectReader for LocalFileReader {
120    async fn chunk_reader(
121        &self,
122        _start: u64,
123        _length: usize,
124    ) -> Result<Box<dyn AsyncRead>> {
125        todo!(
126            "implement once async file readers are available (arrow-rs#78, arrow-rs#111)"
127        )
128    }
129
130    fn sync_chunk_reader(
131        &self,
132        start: u64,
133        length: usize,
134    ) -> Result<Box<dyn Read + Send + Sync>> {
135        // A new file descriptor is opened for each chunk reader.
136        // This okay because chunks are usually fairly large.
137        let mut file = File::open(&self.file.path)?;
138        file.seek(SeekFrom::Start(start))?;
139
140        let file = BufReader::new(file.take(length as u64));
141
142        Ok(Box::new(file))
143    }
144
145    fn length(&self) -> u64 {
146        self.file.size
147    }
148}
149
150fn get_meta(path: String, metadata: Metadata) -> FileMeta {
151    FileMeta {
152        sized_file: SizedFile {
153            path,
154            size: metadata.len(),
155        },
156        last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
157    }
158}
159
160async fn list_all(prefix: String) -> Result<FileMetaStream> {
161    async fn find_files_in_dir(
162        path: String,
163        to_visit: &mut Vec<String>,
164    ) -> Result<Vec<FileMeta>> {
165        let mut dir = tokio::fs::read_dir(path).await?;
166        let mut files = Vec::new();
167
168        while let Some(child) = dir.next_entry().await? {
169            let child_path = path_as_str(&child.path())?.to_string();
170            let metadata = child.metadata().await?;
171            if metadata.is_dir() {
172                to_visit.push(child_path.to_string());
173            } else {
174                files.push(get_meta(child_path.to_owned(), metadata))
175            }
176        }
177        files.sort_by(|a, b| a.path().cmp(b.path()));
178        Ok(files)
179    }
180
181    let prefix_meta = tokio::fs::metadata(&prefix).await?;
182    let prefix = prefix.to_owned();
183    if prefix_meta.is_file() {
184        Ok(Box::pin(stream::once(async move {
185            Ok(get_meta(prefix, prefix_meta))
186        })))
187    } else {
188        let result = stream::unfold(vec![prefix], move |mut to_visit| async move {
189            match to_visit.pop() {
190                None => None,
191                Some(path) => {
192                    let file_stream = match find_files_in_dir(path, &mut to_visit).await {
193                        Ok(files) => stream::iter(files).map(Ok).left_stream(),
194                        Err(e) => stream::once(async { Err(e) }).right_stream(),
195                    };
196
197                    Some((file_stream, to_visit))
198                }
199            }
200        })
201        .flatten();
202        Ok(Box::pin(result))
203    }
204}
205
206/// Create a stream of `ObjectReader` by converting each file in the `files` vector
207/// into instances of `LocalFileReader`
208pub fn local_object_reader_stream(files: Vec<String>) -> ObjectReaderStream {
209    Box::pin(futures::stream::iter(files).map(|f| Ok(local_object_reader(f))))
210}
211
212/// Helper method to convert a file location to a `LocalFileReader`
213pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
214    LocalFileSystem
215        .file_reader(local_unpartitioned_file(file).sized_file)
216        .expect("File not found")
217}
218
219/// Helper method to fetch the file size and date at given path and create a `FileMeta`
220pub fn local_unpartitioned_file(file: String) -> FileMeta {
221    let metadata = fs::metadata(&file).expect("Local file metadata");
222    FileMeta {
223        sized_file: SizedFile {
224            size: metadata.len(),
225            path: file,
226        },
227        last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
228    }
229}
230
231#[cfg(test)]
232mod tests {
233    use crate::ListEntry;
234
235    use super::*;
236    use futures::StreamExt;
237    use std::collections::HashSet;
238    use std::fs::create_dir;
239    use std::fs::File;
240    use tempfile::tempdir;
241
242    #[tokio::test]
243    async fn test_recursive_listing() -> Result<()> {
244        // tmp/a.txt
245        // tmp/x/b.txt
246        // tmp/y/c.txt
247        let tmp = tempdir()?;
248        let x_path = tmp.path().join("x");
249        let y_path = tmp.path().join("y");
250        let a_path = tmp.path().join("a.txt");
251        let b_path = x_path.join("b.txt");
252        let c_path = y_path.join("c.txt");
253        create_dir(&x_path)?;
254        create_dir(&y_path)?;
255        File::create(&a_path)?;
256        File::create(&b_path)?;
257        File::create(&c_path)?;
258
259        let mut all_files = HashSet::new();
260        let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
261        while let Some(file) = files.next().await {
262            let file = file?;
263            assert_eq!(file.size(), 0);
264            all_files.insert(file.path().to_owned());
265        }
266
267        assert_eq!(all_files.len(), 3);
268        assert!(all_files.contains(a_path.to_str().unwrap()));
269        assert!(all_files.contains(b_path.to_str().unwrap()));
270        assert!(all_files.contains(c_path.to_str().unwrap()));
271
272        Ok(())
273    }
274
275    #[tokio::test]
276    async fn test_list_dir() -> Result<()> {
277        // tmp/a.txt
278        // tmp/x/b.txt
279        let tmp = tempdir()?;
280        let x_path = tmp.path().join("x");
281        let a_path = tmp.path().join("a.txt");
282        let b_path = x_path.join("b.txt");
283        create_dir(&x_path)?;
284        File::create(&a_path)?;
285        File::create(&b_path)?;
286
287        fn get_path(entry: ListEntry) -> String {
288            match entry {
289                ListEntry::FileMeta(f) => f.sized_file.path,
290                ListEntry::Prefix(path) => path,
291            }
292        }
293
294        async fn assert_equal_paths(
295            expected: Vec<&std::path::PathBuf>,
296            actual: ListEntryStream,
297        ) -> Result<()> {
298            let expected: HashSet<String> = expected
299                .iter()
300                .map(|x| x.to_str().unwrap().to_string())
301                .collect();
302            let actual: HashSet<String> = actual.map_ok(get_path).try_collect().await?;
303            assert_eq!(expected, actual);
304            Ok(())
305        }
306
307        // Providing no delimiter means recursive file listing
308        let files = LocalFileSystem
309            .list_dir(tmp.path().to_str().unwrap(), None)
310            .await?;
311        assert_equal_paths(vec![&a_path, &b_path], files).await?;
312
313        // Providing slash as delimiter means list immediate files and directories
314        let files = LocalFileSystem
315            .list_dir(tmp.path().to_str().unwrap(), Some("/".to_string()))
316            .await?;
317        assert_equal_paths(vec![&a_path, &x_path], files).await?;
318
319        Ok(())
320    }
321    #[tokio::test]
322    async fn test_list_all_sort_by_filename() -> Result<()> {
323        // tmp/file_23590.parquet
324        // tmp/file_13690.parquet
325        // tmp/file_12590.parquet
326        // tmp/file_03590.parquet
327        let tmp = tempdir()?;
328        let a_path = tmp.path().join("file_23590.parquet");
329        let b_path = tmp.path().join("file_13690.parquet");
330        let c_path = tmp.path().join("file_12590.parquet");
331        let d_path = tmp.path().join("file_03590.parquet");
332        File::create(&a_path)?;
333        File::create(&b_path)?;
334        File::create(&c_path)?;
335        File::create(&d_path)?;
336
337        let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
338        let mut list_files_name = Vec::new();
339        while let Some(file) = files.next().await {
340            let file = file?;
341            list_files_name.push(file.path().to_owned());
342        }
343        let sort_files_name = [
344            tmp.path()
345                .join("file_03590.parquet")
346                .to_str()
347                .unwrap()
348                .to_string(),
349            tmp.path()
350                .join("file_12590.parquet")
351                .to_str()
352                .unwrap()
353                .to_string(),
354            tmp.path()
355                .join("file_13690.parquet")
356                .to_str()
357                .unwrap()
358                .to_string(),
359            tmp.path()
360                .join("file_23590.parquet")
361                .to_str()
362                .unwrap()
363                .to_string(),
364        ];
365        assert_eq!(list_files_name, sort_files_name);
366        Ok(())
367    }
368}