fluvio_future/fs/
extension.rs

1use std::io::Error as IoError;
2use std::io::ErrorKind;
3use std::io::SeekFrom;
4
5#[cfg(unix)]
6use std::os::unix::io::AsRawFd;
7
8use async_trait::async_trait;
9use futures_lite::AsyncSeekExt;
10
11use tracing::trace;
12
13#[cfg(unix)]
14use crate::file_slice::AsyncFileSlice;
15
16use super::File;
17
18/// Utilites for dealing with Async file
19#[async_trait]
20pub trait AsyncFileExtension {
21    async fn reset_to_beginning(&mut self) -> Result<(), IoError>;
22
23    #[cfg(unix)]
24    fn raw_slice(&self, position: u64, len: u64) -> AsyncFileSlice;
25
26    #[cfg(unix)]
27    async fn as_slice(
28        &self,
29        position: u64,
30        desired_len_opt: Option<u64>,
31    ) -> Result<AsyncFileSlice, IoError>;
32}
33
34#[async_trait]
35impl AsyncFileExtension for File {
36    async fn reset_to_beginning(&mut self) -> Result<(), IoError> {
37        self.seek(SeekFrom::Start(0)).await.map(|_| ())
38    }
39
40    /// return raw slice with fiel descriptor, this doesn't not check
41    #[cfg(unix)]
42    fn raw_slice(&self, position: u64, len: u64) -> AsyncFileSlice {
43        AsyncFileSlice::new(self.as_raw_fd(), position, len)
44    }
45
46    /// Extract slice of file using file descriptor
47    /// if desired len is not supplied, compute the len from metadata
48    #[cfg(unix)]
49    async fn as_slice(
50        &self,
51        position: u64,
52        desired_len_opt: Option<u64>,
53    ) -> Result<AsyncFileSlice, IoError> {
54        let metadata = self.metadata().await?;
55        let len = metadata.len();
56        if position >= len {
57            return Err(IoError::new(
58                ErrorKind::UnexpectedEof,
59                "position is greater than available len",
60            ));
61        }
62        let slice_len = if let Some(desired_len) = desired_len_opt {
63            if position + desired_len >= len {
64                return Err(IoError::new(
65                    ErrorKind::UnexpectedEof,
66                    "not available bytes",
67                ));
68            }
69            desired_len
70        } else {
71            len - position
72        };
73
74        trace!("file trace: position: {}, len: {}", position, len);
75
76        Ok(self.raw_slice(position, slice_len))
77    }
78}
79
80#[cfg(test)]
81mod tests {
82
83    use std::env::temp_dir;
84    use std::fs::File;
85    use std::io::Error as IoError;
86    use std::io::Read;
87    use std::io::Seek as _;
88    use std::io::SeekFrom;
89    use std::io::Write;
90
91    use flv_util::fixture::ensure_clean_file;
92
93    use super::AsyncFileExtension;
94    use crate::fs::util as file_util;
95    use crate::test_async;
96    use futures_lite::AsyncReadExt;
97    use futures_lite::AsyncSeekExt;
98    use futures_lite::AsyncWriteExt;
99
100    // sync seek write and read
101    // this is used for implementating async version
102    #[test]
103    fn test_sync_seek_write() -> Result<(), std::io::Error> {
104        let mut option = std::fs::OpenOptions::new();
105        option.read(true).write(true).create(true).append(false);
106        let test_file = temp_dir().join("x1");
107        let mut file = option.open(&test_file)?;
108        file.seek(SeekFrom::Start(0))?;
109        file.write_all(b"test")?;
110        //  file.write_all(b"kkk")?;
111        file.sync_all()?;
112
113        let mut f2 = File::open(&test_file)?;
114        let mut contents = String::new();
115        f2.read_to_string(&mut contents)?;
116        assert_eq!(contents, "test");
117        Ok(())
118    }
119
120    #[test_async]
121    async fn file_multiple_overwrite() -> Result<(), IoError> {
122        let test_file_path = temp_dir().join("file_write_test");
123        ensure_clean_file(&test_file_path);
124
125        // write 4 byte string
126        let mut file = file_util::create(&test_file_path).await?;
127        file.seek(SeekFrom::Start(0)).await?;
128        file.write_all(b"test").await?;
129        file.sync_all().await?;
130
131        // go back beginning and overwrite
132        let mut file = file_util::create(&test_file_path).await?;
133        file.seek(SeekFrom::Start(0)).await?;
134        file.write_all(b"xyzt").await?;
135        file.sync_all().await?;
136        let mut output = Vec::new();
137        let mut file = file_util::open(&test_file_path).await?;
138        file.read_to_end(&mut output).await?;
139        assert_eq!(output.len(), 4);
140        let contents = String::from_utf8(output).expect("conversion");
141        assert_eq!(contents, "xyzt");
142
143        Ok(())
144    }
145
146    #[test_async]
147    async fn async_file_write_read_same() -> Result<(), IoError> {
148        let test_file_path = temp_dir().join("read_write_test");
149        ensure_clean_file(&test_file_path);
150
151        let mut output = Vec::new();
152        let mut file = file_util::open_read_write(&test_file_path).await?;
153        file.write_all(b"test").await?;
154        file.seek(SeekFrom::Start(0)).await?;
155        file.read_to_end(&mut output).await?;
156        assert_eq!(output.len(), 4);
157        let contents = String::from_utf8(output).expect("conversion");
158        assert_eq!(contents, "test");
159
160        Ok(())
161    }
162
163    #[test_async]
164    async fn async_file_write_append_same() -> Result<(), IoError> {
165        let test_file_path = temp_dir().join("read_append_test");
166        ensure_clean_file(&test_file_path);
167
168        let mut output = Vec::new();
169        let mut file = file_util::open_read_append(&test_file_path).await?;
170        file.write_all(b"test").await?;
171        file.seek(SeekFrom::Start(0)).await?;
172        file.write_all(b"xyz").await?;
173        file.seek(SeekFrom::Start(0)).await?;
174        file.read_to_end(&mut output).await?;
175        assert_eq!(output.len(), 7);
176        let contents = String::from_utf8(output).expect("conversion");
177        assert_eq!(contents, "testxyz");
178
179        Ok(())
180    }
181
182    #[cfg(unix)]
183    #[test_async]
184    async fn test_as_slice() -> Result<(), IoError> {
185        let file = file_util::open("test-data/apirequest.bin").await?;
186        let f_slice = file.as_slice(0, None).await?;
187        assert_eq!(f_slice.position(), 0);
188        assert_eq!(f_slice.len(), 30);
189        Ok(())
190    }
191}