Skip to main content

paimon/io/
file_io.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::error::*;
19use std::collections::HashMap;
20use std::ops::Range;
21use std::sync::Arc;
22use std::time::SystemTime;
23
24use bytes::Bytes;
25use chrono::{DateTime, Utc};
26use opendal::raw::normalize_root;
27use opendal::Operator;
28use snafu::ResultExt;
29use tokio_util::compat::FuturesAsyncWriteCompatExt;
30use url::Url;
31
32use super::Storage;
33
34#[derive(Clone, Debug)]
35pub struct FileIO {
36    storage: Arc<Storage>,
37}
38
39impl FileIO {
40    /// Try to infer file io scheme from path.
41    ///
42    /// The input HashMap is paimon-java's [`Options`](https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/options/Options.java#L60)
43    pub fn from_url(path: &str) -> crate::Result<FileIOBuilder> {
44        let url = Url::parse(path).map_err(|_| Error::ConfigInvalid {
45            message: format!("Invalid URL: {path}"),
46        })?;
47
48        Ok(FileIOBuilder::new(url.scheme()))
49    }
50
51    /// Try to infer file io scheme from path. See [`FileIO`] for supported schemes.
52    ///
53    /// - If it's a valid url, for example `s3://bucket/a`, url scheme will be used, and the rest of the url will be ignored.
54    /// - If it's not a valid url, will try to detect if it's a file path.
55    ///
56    /// Otherwise will return parsing error.
57    pub fn from_path(path: impl AsRef<str>) -> crate::Result<FileIOBuilder> {
58        let path = path.as_ref();
59        let url = if looks_like_windows_drive_path(path) {
60            Url::from_file_path(path).map_err(|_| Error::ConfigInvalid {
61                message: format!("Input {path} is neither a valid url nor path"),
62            })?
63        } else {
64            Url::parse(path)
65                .map_err(|_| Error::ConfigInvalid {
66                    message: format!("Invalid URL: {path}"),
67                })
68                .or_else(|_| {
69                    Url::from_file_path(path).map_err(|_| Error::ConfigInvalid {
70                        message: format!("Input {path} is neither a valid url nor path"),
71                    })
72                })?
73        };
74        Ok(FileIOBuilder::new(url.scheme()))
75    }
76
77    /// Create a new input file to read data.
78    ///
79    /// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L76>
80    pub fn new_input(&self, path: &str) -> crate::Result<InputFile> {
81        let (op, relative_path) = self.storage.create(path)?;
82        let path = path.to_string();
83        let relative_path_pos = path.len() - relative_path.len();
84        Ok(InputFile {
85            op,
86            path,
87            relative_path_pos,
88        })
89    }
90
91    /// Create a new output file to write data.
92    ///
93    /// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L87>
94    pub fn new_output(&self, path: &str) -> Result<OutputFile> {
95        let (op, relative_path) = self.storage.create(path)?;
96        let path = path.to_string();
97        let relative_path_pos = path.len() - relative_path.len();
98        Ok(OutputFile {
99            op,
100            path,
101            relative_path_pos,
102        })
103    }
104
105    /// Return a file status object that represents the path.
106    ///
107    /// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L97>
108    pub async fn get_status(&self, path: &str) -> Result<FileStatus> {
109        let (op, relative_path) = self.storage.create(path)?;
110        let meta = op.stat(relative_path).await.context(IoUnexpectedSnafu {
111            message: format!("Failed to get file status for '{path}'"),
112        })?;
113
114        Ok(FileStatus {
115            size: meta.content_length(),
116            is_dir: meta.is_dir(),
117            last_modified: meta
118                .last_modified()
119                .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
120            path: path.to_string(),
121        })
122    }
123
124    /// List the statuses of the files/directories in the given path if the path is a directory.
125    ///
126    /// References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L105>
127    ///
128    /// FIXME: how to handle large dir? Better to return a stream instead?
129    pub async fn list_status(&self, path: &str) -> Result<Vec<FileStatus>> {
130        let (op, relative_path) = self.storage.create(path)?;
131        let base_path = &path[..path.len() - relative_path.len()];
132        // Opendal list() expects directory path to end with `/`.
133        // use normalize_root to make sure it end with `/`.
134        let list_path = normalize_root(relative_path);
135
136        let entries = op.list_with(&list_path).await.context(IoUnexpectedSnafu {
137            message: format!("Failed to list files in '{path}'"),
138        })?;
139
140        let mut statuses = Vec::new();
141        let list_path_normalized = list_path.trim_start_matches('/');
142        for entry in entries {
143            let entry_path = entry.path();
144            if entry_path.trim_start_matches('/') == list_path_normalized {
145                continue;
146            }
147            let meta = entry.metadata();
148            statuses.push(FileStatus {
149                size: meta.content_length(),
150                is_dir: meta.is_dir(),
151                path: format!("{base_path}{entry_path}"),
152                last_modified: meta
153                    .last_modified()
154                    .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
155            });
156        }
157
158        Ok(statuses)
159    }
160
161    /// Check if exists.
162    ///
163    /// References: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L128>
164    pub async fn exists(&self, path: &str) -> Result<bool> {
165        let (op, relative_path) = self.storage.create(path)?;
166
167        op.exists(relative_path).await.context(IoUnexpectedSnafu {
168            message: format!("Failed to check existence of '{path}'"),
169        })
170    }
171
172    /// Delete a file.
173    ///
174    /// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
175    pub async fn delete_file(&self, path: &str) -> Result<()> {
176        let (op, relative_path) = self.storage.create(path)?;
177
178        op.delete(relative_path).await.context(IoUnexpectedSnafu {
179            message: format!("Failed to delete file '{path}'"),
180        })?;
181
182        Ok(())
183    }
184
185    /// Delete a dir recursively.
186    ///
187    /// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L139>
188    pub async fn delete_dir(&self, path: &str) -> Result<()> {
189        let (op, relative_path) = self.storage.create(path)?;
190
191        op.remove_all(relative_path)
192            .await
193            .context(IoUnexpectedSnafu {
194                message: format!("Failed to delete directory '{path}'"),
195            })?;
196
197        Ok(())
198    }
199
200    /// Make the given file and all non-existent parents into directories.
201    ///
202    /// Has the semantics of Unix 'mkdir -p'. Existence of the directory hierarchy is not an error.
203    ///
204    /// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L150>
205    pub async fn mkdirs(&self, path: &str) -> Result<()> {
206        let (op, relative_path) = self.storage.create(path)?;
207        // Opendal create_dir expects the path to end with `/` to indicate a directory.
208        let dir_path = normalize_root(relative_path);
209        op.create_dir(&dir_path).await.context(IoUnexpectedSnafu {
210            message: format!("Failed to create directory '{path}'"),
211        })?;
212
213        Ok(())
214    }
215
216    /// Renames the file/directory src to dst.
217    ///
218    /// Reference: <https://github.com/apache/paimon/blob/release-0.8.2/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java#L159>
219    pub async fn rename(&self, src: &str, dst: &str) -> Result<()> {
220        let (op_src, relative_path_src) = self.storage.create(src)?;
221        let (_, relative_path_dst) = self.storage.create(dst)?;
222
223        op_src
224            .rename(relative_path_src, relative_path_dst)
225            .await
226            .context(IoUnexpectedSnafu {
227                message: format!("Failed to rename '{src}' to '{dst}'"),
228            })?;
229
230        Ok(())
231    }
232}
233
234fn looks_like_windows_drive_path(path: &str) -> bool {
235    let bytes = path.as_bytes();
236    bytes.len() >= 3
237        && bytes[0].is_ascii_alphabetic()
238        && bytes[1] == b':'
239        && matches!(bytes[2], b'\\' | b'/')
240}
241
242#[derive(Debug)]
243pub struct FileIOBuilder {
244    scheme_str: Option<String>,
245    props: HashMap<String, String>,
246}
247
248impl FileIOBuilder {
249    pub fn new(scheme_str: impl ToString) -> Self {
250        Self {
251            scheme_str: Some(scheme_str.to_string()),
252            props: HashMap::default(),
253        }
254    }
255
256    pub(crate) fn into_parts(self) -> (String, HashMap<String, String>) {
257        (self.scheme_str.unwrap_or_default(), self.props)
258    }
259
260    pub fn with_prop(mut self, key: impl ToString, value: impl ToString) -> Self {
261        self.props.insert(key.to_string(), value.to_string());
262        self
263    }
264
265    pub fn with_props(
266        mut self,
267        args: impl IntoIterator<Item = (impl ToString, impl ToString)>,
268    ) -> Self {
269        self.props
270            .extend(args.into_iter().map(|e| (e.0.to_string(), e.1.to_string())));
271        self
272    }
273
274    pub fn build(self) -> crate::Result<FileIO> {
275        let storage = Storage::build(self)?;
276        Ok(FileIO {
277            storage: Arc::new(storage),
278        })
279    }
280}
281
282#[async_trait::async_trait]
283pub trait FileRead: Send + Sync + Unpin + 'static {
284    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
285}
286
287#[async_trait::async_trait]
288impl FileRead for opendal::Reader {
289    async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
290        Ok(opendal::Reader::read(self, range).await?.to_bytes())
291    }
292}
293
294#[async_trait::async_trait]
295pub trait FileWrite: Send + Unpin + 'static {
296    async fn write(&mut self, bs: Bytes) -> crate::Result<()>;
297
298    async fn close(&mut self) -> crate::Result<()>;
299}
300
301#[async_trait::async_trait]
302impl FileWrite for opendal::Writer {
303    async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
304        Ok(opendal::Writer::write(self, bs).await?)
305    }
306
307    async fn close(&mut self) -> crate::Result<()> {
308        opendal::Writer::close(self).await?;
309        Ok(())
310    }
311}
312
313/// Async streaming writer trait for format-level writers (e.g. parquet).
314pub trait AsyncFileWrite: tokio::io::AsyncWrite + Unpin + Send {}
315
316impl<T: tokio::io::AsyncWrite + Unpin + Send> AsyncFileWrite for T {}
317
318#[derive(Clone, Debug)]
319pub struct FileStatus {
320    pub size: u64,
321    pub is_dir: bool,
322    pub path: String,
323    pub last_modified: Option<DateTime<Utc>>,
324}
325
326#[derive(Debug)]
327pub struct InputFile {
328    op: Operator,
329    path: String,
330    relative_path_pos: usize,
331}
332
333impl InputFile {
334    pub fn location(&self) -> &str {
335        &self.path
336    }
337
338    pub async fn exists(&self) -> crate::Result<bool> {
339        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
340    }
341
342    pub async fn metadata(&self) -> crate::Result<FileStatus> {
343        let meta = self.op.stat(&self.path[self.relative_path_pos..]).await?;
344
345        Ok(FileStatus {
346            size: meta.content_length(),
347            is_dir: meta.is_dir(),
348            path: self.path.clone(),
349            last_modified: meta
350                .last_modified()
351                .map(|v| DateTime::<Utc>::from(SystemTime::from(v))),
352        })
353    }
354
355    pub async fn read(&self) -> crate::Result<Bytes> {
356        Ok(self
357            .op
358            .read(&self.path[self.relative_path_pos..])
359            .await?
360            .to_bytes())
361    }
362
363    pub async fn reader(&self) -> crate::Result<impl FileRead> {
364        Ok(self.op.reader(&self.path[self.relative_path_pos..]).await?)
365    }
366}
367
368#[derive(Debug, Clone)]
369pub struct OutputFile {
370    op: Operator,
371    path: String,
372    relative_path_pos: usize,
373}
374
375impl OutputFile {
376    pub fn location(&self) -> &str {
377        &self.path
378    }
379
380    pub async fn exists(&self) -> crate::Result<bool> {
381        Ok(self.op.exists(&self.path[self.relative_path_pos..]).await?)
382    }
383
384    pub fn to_input_file(self) -> InputFile {
385        InputFile {
386            op: self.op,
387            path: self.path,
388            relative_path_pos: self.relative_path_pos,
389        }
390    }
391
392    pub async fn write(&self, bs: Bytes) -> crate::Result<()> {
393        let mut writer = self.writer().await?;
394        writer.write(bs).await?;
395        writer.close().await
396    }
397
398    pub async fn writer(&self) -> crate::Result<Box<dyn FileWrite>> {
399        Ok(Box::new(self.opendal_writer().await?))
400    }
401
402    /// Get an async streaming writer for format-level writes (e.g. parquet).
403    pub(crate) async fn async_writer(&self) -> crate::Result<Box<dyn AsyncFileWrite>> {
404        Ok(Box::new(
405            self.opendal_writer()
406                .await?
407                .into_futures_async_write()
408                .compat_write(),
409        ))
410    }
411
412    async fn opendal_writer(&self) -> crate::Result<opendal::Writer> {
413        Ok(self.op.writer(&self.path[self.relative_path_pos..]).await?)
414    }
415}
416
417#[cfg(test)]
418mod file_action_test {
419    use std::collections::BTreeSet;
420    use std::fs;
421    use tempfile::tempdir;
422
423    use super::*;
424    use bytes::Bytes;
425
426    fn setup_memory_file_io() -> FileIO {
427        FileIOBuilder::new("memory").build().unwrap()
428    }
429
430    fn setup_fs_file_io() -> FileIO {
431        FileIOBuilder::new("file").build().unwrap()
432    }
433
434    fn local_file_path(path: &std::path::Path) -> String {
435        let normalized = path.to_string_lossy().replace('\\', "/");
436        if normalized.starts_with('/') {
437            format!("file:{normalized}")
438        } else {
439            format!("file:/{normalized}")
440        }
441    }
442
443    async fn common_test_get_status(file_io: &FileIO, path: &str) {
444        let output = file_io.new_output(path).unwrap();
445        let mut writer = output.writer().await.unwrap();
446        writer.write(Bytes::from("hello world")).await.unwrap();
447        writer.close().await.unwrap();
448
449        let status = file_io.get_status(path).await.unwrap();
450        assert_eq!(status.size, 11);
451
452        file_io.delete_file(path).await.unwrap();
453    }
454
455    async fn common_test_exists(file_io: &FileIO, path: &str) {
456        let output = file_io.new_output(path).unwrap();
457        let mut writer = output.writer().await.unwrap();
458        writer.write(Bytes::from("hello world")).await.unwrap();
459        writer.close().await.unwrap();
460
461        let exists = file_io.exists(path).await.unwrap();
462        assert!(exists);
463
464        file_io.delete_file(path).await.unwrap();
465    }
466
467    async fn common_test_delete_file(file_io: &FileIO, path: &str) {
468        let output = file_io.new_output(path).unwrap();
469        let mut writer = output.writer().await.unwrap();
470        writer.write(Bytes::from("hello world")).await.unwrap();
471        writer.close().await.unwrap();
472
473        file_io.delete_file(path).await.unwrap();
474
475        let exists = file_io.exists(path).await.unwrap();
476        assert!(!exists);
477    }
478
479    async fn common_test_mkdirs(file_io: &FileIO, dir_path: &str) {
480        file_io.mkdirs(dir_path).await.unwrap();
481
482        let exists = file_io.exists(dir_path).await.unwrap();
483        assert!(exists);
484
485        let _ = fs::remove_dir_all(dir_path.strip_prefix("file:/").unwrap());
486    }
487
488    async fn common_test_rename(file_io: &FileIO, src: &str, dst: &str) {
489        let output = file_io.new_output(src).unwrap();
490        let mut writer = output.writer().await.unwrap();
491        writer.write(Bytes::from("hello world")).await.unwrap();
492        writer.close().await.unwrap();
493
494        file_io.rename(src, dst).await.unwrap();
495
496        let exists_old = file_io.exists(src).await.unwrap();
497        let exists_new = file_io.exists(dst).await.unwrap();
498        assert!(!exists_old);
499        assert!(exists_new);
500
501        file_io.delete_file(dst).await.unwrap();
502    }
503
504    async fn common_test_list_status_paths(file_io: &FileIO, dir_path: &str) {
505        if let Some(local_dir) = dir_path.strip_prefix("file:/") {
506            let _ = fs::remove_dir_all(local_dir);
507        }
508
509        file_io.mkdirs(dir_path).await.unwrap();
510
511        let file_a = format!("{dir_path}a.txt");
512        let file_b = format!("{dir_path}b.txt");
513        for file in [&file_a, &file_b] {
514            file_io
515                .new_output(file)
516                .unwrap()
517                .write(Bytes::from("test data"))
518                .await
519                .unwrap();
520        }
521
522        let statuses = file_io.list_status(dir_path).await.unwrap();
523        assert_eq!(statuses.len(), 2);
524
525        let expected_paths: BTreeSet<String> =
526            [file_a.clone(), file_b.clone()].into_iter().collect();
527        let actual_paths: BTreeSet<String> =
528            statuses.iter().map(|status| status.path.clone()).collect();
529        assert_eq!(
530            actual_paths, expected_paths,
531            "list_status should return exact entry paths"
532        );
533
534        file_io.delete_dir(dir_path).await.unwrap();
535    }
536
537    #[tokio::test]
538    async fn test_delete_file_memory() {
539        let file_io = setup_memory_file_io();
540        common_test_delete_file(&file_io, "memory:/test_file_delete_mem").await;
541    }
542
543    #[tokio::test]
544    async fn test_empty_path_should_return_error_for_exists_fs() {
545        let file_io = setup_fs_file_io();
546        let result = file_io.exists("").await;
547        assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
548    }
549
550    #[tokio::test]
551    async fn test_empty_path_should_return_error_for_exists_memory() {
552        let file_io = setup_memory_file_io();
553        let result = file_io.exists("").await;
554        assert!(matches!(result, Err(Error::ConfigInvalid { .. })));
555    }
556
557    #[tokio::test]
558    async fn test_memory_operator_reuse_across_file_io_calls() {
559        let file_io = setup_memory_file_io();
560        let path = "memory:/tmp/reuse_case";
561        let dir = "memory:/tmp/";
562
563        file_io
564            .new_output(path)
565            .unwrap()
566            .write(Bytes::from("data"))
567            .await
568            .unwrap();
569
570        assert!(file_io.exists(path).await.unwrap());
571        assert_eq!(file_io.get_status(path).await.unwrap().size, 4);
572        assert!(file_io
573            .list_status(dir)
574            .await
575            .unwrap()
576            .iter()
577            .any(|status| status.path == path));
578
579        file_io.delete_dir(dir).await.unwrap();
580    }
581
582    #[tokio::test]
583    async fn test_memory_operator_not_shared_between_file_io_instances() {
584        let file_io_1 = setup_memory_file_io();
585        let file_io_2 = setup_memory_file_io();
586        let path = "memory:/tmp/reuse_isolation_case";
587
588        file_io_1
589            .new_output(path)
590            .unwrap()
591            .write(Bytes::from("data"))
592            .await
593            .unwrap();
594
595        assert!(file_io_1.exists(path).await.unwrap());
596        assert!(!file_io_2.exists(path).await.unwrap());
597    }
598
599    #[tokio::test]
600    async fn test_get_status_fs() {
601        let file_io = setup_fs_file_io();
602        common_test_get_status(&file_io, "file:/tmp/test_file_get_status_fs").await;
603    }
604
605    #[tokio::test]
606    async fn test_exists_fs() {
607        let file_io = setup_fs_file_io();
608        common_test_exists(&file_io, "file:/tmp/test_file_exists_fs").await;
609    }
610
611    #[tokio::test]
612    async fn test_delete_file_fs() {
613        let file_io = setup_fs_file_io();
614        common_test_delete_file(&file_io, "file:/tmp/test_file_delete_fs").await;
615    }
616
617    #[tokio::test]
618    async fn test_mkdirs_fs() {
619        let file_io = setup_fs_file_io();
620        common_test_mkdirs(&file_io, "file:/tmp/test_fs_dir/").await;
621    }
622
623    #[tokio::test]
624    async fn test_rename_fs() {
625        let file_io = setup_fs_file_io();
626        common_test_rename(
627            &file_io,
628            "file:/tmp/test_file_fs_z",
629            "file:/tmp/new_test_file_fs_o",
630        )
631        .await;
632    }
633
634    #[tokio::test]
635    async fn test_list_status_fs_should_return_entry_paths() {
636        let file_io = setup_fs_file_io();
637        common_test_list_status_paths(&file_io, "file:/tmp/test_list_status_paths_fs/").await;
638    }
639
640    #[test]
641    fn test_from_path_detects_local_fs_path() {
642        let dir = tempdir().unwrap();
643        let file_io = FileIO::from_path(dir.path().to_string_lossy())
644            .unwrap()
645            .build()
646            .unwrap();
647        let path = local_file_path(&dir.path().join("from_path_detects_local_fs_path.txt"));
648
649        let rt = tokio::runtime::Runtime::new().unwrap();
650        rt.block_on(async {
651            file_io
652                .new_output(&path)
653                .unwrap()
654                .write(Bytes::from("data"))
655                .await
656                .unwrap();
657            assert!(file_io.exists(&path).await.unwrap());
658        });
659    }
660}
661
662#[cfg(test)]
663mod input_output_test {
664    use super::*;
665    use bytes::Bytes;
666
667    fn setup_memory_file_io() -> FileIO {
668        FileIOBuilder::new("memory").build().unwrap()
669    }
670
671    fn setup_fs_file_io() -> FileIO {
672        FileIOBuilder::new("file").build().unwrap()
673    }
674
675    async fn common_test_output_file_write_and_read(file_io: &FileIO, path: &str) {
676        let output = file_io.new_output(path).unwrap();
677        let mut writer = output.writer().await.unwrap();
678        writer.write(Bytes::from("hello world")).await.unwrap();
679        writer.close().await.unwrap();
680
681        let input = output.to_input_file();
682        let content = input.read().await.unwrap();
683
684        assert_eq!(&content[..], b"hello world");
685
686        file_io.delete_file(path).await.unwrap();
687    }
688
689    async fn common_test_output_file_exists(file_io: &FileIO, path: &str) {
690        let output = file_io.new_output(path).unwrap();
691        let mut writer = output.writer().await.unwrap();
692        writer.write(Bytes::from("hello world")).await.unwrap();
693        writer.close().await.unwrap();
694
695        let exists = output.exists().await.unwrap();
696        assert!(exists);
697
698        file_io.delete_file(path).await.unwrap();
699    }
700
701    async fn common_test_input_file_metadata(file_io: &FileIO, path: &str) {
702        let output = file_io.new_output(path).unwrap();
703        let mut writer = output.writer().await.unwrap();
704        writer.write(Bytes::from("hello world")).await.unwrap();
705        writer.close().await.unwrap();
706
707        let input = output.to_input_file();
708        let metadata = input.metadata().await.unwrap();
709
710        assert_eq!(metadata.size, 11);
711
712        file_io.delete_file(path).await.unwrap();
713    }
714
715    async fn common_test_input_file_partial_read(file_io: &FileIO, path: &str) {
716        let output = file_io.new_output(path).unwrap();
717        let mut writer = output.writer().await.unwrap();
718        writer.write(Bytes::from("hello world")).await.unwrap();
719        writer.close().await.unwrap();
720
721        let input = output.to_input_file();
722        let reader = input.reader().await.unwrap();
723        let partial_content = reader.read(0..5).await.unwrap(); // read "hello"
724
725        assert_eq!(&partial_content[..], b"hello");
726
727        file_io.delete_file(path).await.unwrap();
728    }
729
730    #[tokio::test]
731    async fn test_output_file_write_and_read_memory() {
732        let file_io = setup_memory_file_io();
733        common_test_output_file_write_and_read(&file_io, "memory:/test_file_rw_mem").await;
734    }
735
736    #[tokio::test]
737    async fn test_output_file_exists_memory() {
738        let file_io = setup_memory_file_io();
739        common_test_output_file_exists(&file_io, "memory:/test_file_exist_mem").await;
740    }
741
742    #[tokio::test]
743    async fn test_input_file_metadata_memory() {
744        let file_io = setup_memory_file_io();
745        common_test_input_file_metadata(&file_io, "memory:/test_file_meta_mem").await;
746    }
747
748    #[tokio::test]
749    async fn test_input_file_partial_read_memory() {
750        let file_io = setup_memory_file_io();
751        common_test_input_file_partial_read(&file_io, "memory:/test_file_part_read_mem").await;
752    }
753
754    #[tokio::test]
755    async fn test_output_file_write_and_read_fs() {
756        let file_io = setup_fs_file_io();
757        common_test_output_file_write_and_read(&file_io, "file:/tmp/test_file_fs_rw").await;
758    }
759
760    #[tokio::test]
761    async fn test_output_file_exists_fs() {
762        let file_io = setup_fs_file_io();
763        common_test_output_file_exists(&file_io, "file:/tmp/test_file_exists").await;
764    }
765
766    #[tokio::test]
767    async fn test_input_file_metadata_fs() {
768        let file_io = setup_fs_file_io();
769        common_test_input_file_metadata(&file_io, "file:/tmp/test_file_meta").await;
770    }
771
772    #[tokio::test]
773    async fn test_input_file_partial_read_fs() {
774        let file_io = setup_fs_file_io();
775        common_test_input_file_partial_read(&file_io, "file:/tmp/test_file_read_fs").await;
776    }
777}