datafusion_data_access/object_store/
local.rs1use std::fs::{self, File, Metadata};
21use std::io;
22use std::io::{BufReader, Read, Seek, SeekFrom};
23use std::sync::Arc;
24
25use async_trait::async_trait;
26use futures::{stream, AsyncRead, StreamExt, TryStreamExt};
27
28use crate::{FileMeta, ListEntry, Result, SizedFile};
29
30use super::{
31 FileMetaStream, ListEntryStream, ObjectReader, ObjectReaderStream, ObjectStore,
32};
33
34pub static LOCAL_SCHEME: &str = "file";
35
36#[derive(Debug)]
37pub struct LocalFileSystem;
39
40#[async_trait]
41impl ObjectStore for LocalFileSystem {
42 async fn list_file(&self, prefix: &str) -> Result<FileMetaStream> {
43 let prefix = if let Some((_scheme, path)) = prefix.split_once("://") {
44 path
45 } else {
46 prefix
47 };
48 list_all(prefix.to_owned()).await
49 }
50
51 async fn list_dir(
52 &self,
53 prefix: &str,
54 delimiter: Option<String>,
55 ) -> Result<ListEntryStream> {
56 if let Some(d) = delimiter {
57 if d != "/" && d != "\\" {
58 return Err(std::io::Error::new(
59 std::io::ErrorKind::InvalidInput,
60 format!("delimiter not supported on local filesystem: {}", d),
61 ));
62 }
63 let mut entry_stream = tokio::fs::read_dir(prefix).await?;
64
65 let list_entries = stream::poll_fn(move |cx| {
66 entry_stream.poll_next_entry(cx).map(|res| match res {
67 Ok(Some(x)) => Some(Ok(x)),
68 Ok(None) => None,
69 Err(err) => Some(Err(err)),
70 })
71 })
72 .then(|entry| async {
73 let entry = entry?;
74 let entry = if entry.file_type().await?.is_dir() {
75 ListEntry::Prefix(path_as_str(&entry.path())?.to_string())
76 } else {
77 ListEntry::FileMeta(get_meta(
78 path_as_str(&entry.path())?.to_string(),
79 entry.metadata().await?,
80 ))
81 };
82 Ok(entry)
83 });
84
85 Ok(Box::pin(list_entries))
86 } else {
87 Ok(Box::pin(
88 self.list_file(prefix).await?.map_ok(ListEntry::FileMeta),
89 ))
90 }
91 }
92
93 fn file_reader(&self, file: SizedFile) -> Result<Arc<dyn ObjectReader>> {
94 Ok(Arc::new(LocalFileReader::new(file)?))
95 }
96}
97
98pub fn path_as_str(path: &std::path::Path) -> Result<&str> {
100 path.to_str().ok_or_else(|| {
101 io::Error::new(
102 io::ErrorKind::InvalidInput,
103 format!("Invalid path '{}'", path.display()),
104 )
105 })
106}
107
108struct LocalFileReader {
109 file: SizedFile,
110}
111
112impl LocalFileReader {
113 fn new(file: SizedFile) -> Result<Self> {
114 Ok(Self { file })
115 }
116}
117
118#[async_trait]
119impl ObjectReader for LocalFileReader {
120 async fn chunk_reader(
121 &self,
122 _start: u64,
123 _length: usize,
124 ) -> Result<Box<dyn AsyncRead>> {
125 todo!(
126 "implement once async file readers are available (arrow-rs#78, arrow-rs#111)"
127 )
128 }
129
130 fn sync_chunk_reader(
131 &self,
132 start: u64,
133 length: usize,
134 ) -> Result<Box<dyn Read + Send + Sync>> {
135 let mut file = File::open(&self.file.path)?;
138 file.seek(SeekFrom::Start(start))?;
139
140 let file = BufReader::new(file.take(length as u64));
141
142 Ok(Box::new(file))
143 }
144
145 fn length(&self) -> u64 {
146 self.file.size
147 }
148}
149
150fn get_meta(path: String, metadata: Metadata) -> FileMeta {
151 FileMeta {
152 sized_file: SizedFile {
153 path,
154 size: metadata.len(),
155 },
156 last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
157 }
158}
159
160async fn list_all(prefix: String) -> Result<FileMetaStream> {
161 async fn find_files_in_dir(
162 path: String,
163 to_visit: &mut Vec<String>,
164 ) -> Result<Vec<FileMeta>> {
165 let mut dir = tokio::fs::read_dir(path).await?;
166 let mut files = Vec::new();
167
168 while let Some(child) = dir.next_entry().await? {
169 let child_path = path_as_str(&child.path())?.to_string();
170 let metadata = child.metadata().await?;
171 if metadata.is_dir() {
172 to_visit.push(child_path.to_string());
173 } else {
174 files.push(get_meta(child_path.to_owned(), metadata))
175 }
176 }
177 files.sort_by(|a, b| a.path().cmp(b.path()));
178 Ok(files)
179 }
180
181 let prefix_meta = tokio::fs::metadata(&prefix).await?;
182 let prefix = prefix.to_owned();
183 if prefix_meta.is_file() {
184 Ok(Box::pin(stream::once(async move {
185 Ok(get_meta(prefix, prefix_meta))
186 })))
187 } else {
188 let result = stream::unfold(vec![prefix], move |mut to_visit| async move {
189 match to_visit.pop() {
190 None => None,
191 Some(path) => {
192 let file_stream = match find_files_in_dir(path, &mut to_visit).await {
193 Ok(files) => stream::iter(files).map(Ok).left_stream(),
194 Err(e) => stream::once(async { Err(e) }).right_stream(),
195 };
196
197 Some((file_stream, to_visit))
198 }
199 }
200 })
201 .flatten();
202 Ok(Box::pin(result))
203 }
204}
205
206pub fn local_object_reader_stream(files: Vec<String>) -> ObjectReaderStream {
209 Box::pin(futures::stream::iter(files).map(|f| Ok(local_object_reader(f))))
210}
211
212pub fn local_object_reader(file: String) -> Arc<dyn ObjectReader> {
214 LocalFileSystem
215 .file_reader(local_unpartitioned_file(file).sized_file)
216 .expect("File not found")
217}
218
219pub fn local_unpartitioned_file(file: String) -> FileMeta {
221 let metadata = fs::metadata(&file).expect("Local file metadata");
222 FileMeta {
223 sized_file: SizedFile {
224 size: metadata.len(),
225 path: file,
226 },
227 last_modified: metadata.modified().map(chrono::DateTime::from).ok(),
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use crate::ListEntry;
234
235 use super::*;
236 use futures::StreamExt;
237 use std::collections::HashSet;
238 use std::fs::create_dir;
239 use std::fs::File;
240 use tempfile::tempdir;
241
242 #[tokio::test]
243 async fn test_recursive_listing() -> Result<()> {
244 let tmp = tempdir()?;
248 let x_path = tmp.path().join("x");
249 let y_path = tmp.path().join("y");
250 let a_path = tmp.path().join("a.txt");
251 let b_path = x_path.join("b.txt");
252 let c_path = y_path.join("c.txt");
253 create_dir(&x_path)?;
254 create_dir(&y_path)?;
255 File::create(&a_path)?;
256 File::create(&b_path)?;
257 File::create(&c_path)?;
258
259 let mut all_files = HashSet::new();
260 let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
261 while let Some(file) = files.next().await {
262 let file = file?;
263 assert_eq!(file.size(), 0);
264 all_files.insert(file.path().to_owned());
265 }
266
267 assert_eq!(all_files.len(), 3);
268 assert!(all_files.contains(a_path.to_str().unwrap()));
269 assert!(all_files.contains(b_path.to_str().unwrap()));
270 assert!(all_files.contains(c_path.to_str().unwrap()));
271
272 Ok(())
273 }
274
275 #[tokio::test]
276 async fn test_list_dir() -> Result<()> {
277 let tmp = tempdir()?;
280 let x_path = tmp.path().join("x");
281 let a_path = tmp.path().join("a.txt");
282 let b_path = x_path.join("b.txt");
283 create_dir(&x_path)?;
284 File::create(&a_path)?;
285 File::create(&b_path)?;
286
287 fn get_path(entry: ListEntry) -> String {
288 match entry {
289 ListEntry::FileMeta(f) => f.sized_file.path,
290 ListEntry::Prefix(path) => path,
291 }
292 }
293
294 async fn assert_equal_paths(
295 expected: Vec<&std::path::PathBuf>,
296 actual: ListEntryStream,
297 ) -> Result<()> {
298 let expected: HashSet<String> = expected
299 .iter()
300 .map(|x| x.to_str().unwrap().to_string())
301 .collect();
302 let actual: HashSet<String> = actual.map_ok(get_path).try_collect().await?;
303 assert_eq!(expected, actual);
304 Ok(())
305 }
306
307 let files = LocalFileSystem
309 .list_dir(tmp.path().to_str().unwrap(), None)
310 .await?;
311 assert_equal_paths(vec![&a_path, &b_path], files).await?;
312
313 let files = LocalFileSystem
315 .list_dir(tmp.path().to_str().unwrap(), Some("/".to_string()))
316 .await?;
317 assert_equal_paths(vec![&a_path, &x_path], files).await?;
318
319 Ok(())
320 }
321 #[tokio::test]
322 async fn test_list_all_sort_by_filename() -> Result<()> {
323 let tmp = tempdir()?;
328 let a_path = tmp.path().join("file_23590.parquet");
329 let b_path = tmp.path().join("file_13690.parquet");
330 let c_path = tmp.path().join("file_12590.parquet");
331 let d_path = tmp.path().join("file_03590.parquet");
332 File::create(&a_path)?;
333 File::create(&b_path)?;
334 File::create(&c_path)?;
335 File::create(&d_path)?;
336
337 let mut files = list_all(tmp.path().to_str().unwrap().to_string()).await?;
338 let mut list_files_name = Vec::new();
339 while let Some(file) = files.next().await {
340 let file = file?;
341 list_files_name.push(file.path().to_owned());
342 }
343 let sort_files_name = [
344 tmp.path()
345 .join("file_03590.parquet")
346 .to_str()
347 .unwrap()
348 .to_string(),
349 tmp.path()
350 .join("file_12590.parquet")
351 .to_str()
352 .unwrap()
353 .to_string(),
354 tmp.path()
355 .join("file_13690.parquet")
356 .to_str()
357 .unwrap()
358 .to_string(),
359 tmp.path()
360 .join("file_23590.parquet")
361 .to_str()
362 .unwrap()
363 .to_string(),
364 ];
365 assert_eq!(list_files_name, sort_files_name);
366 Ok(())
367 }
368}