fluvio_future/fs/
extension.rs1use std::io::Error as IoError;
2use std::io::ErrorKind;
3use std::io::SeekFrom;
4
5#[cfg(unix)]
6use std::os::unix::io::AsRawFd;
7
8use async_trait::async_trait;
9use futures_lite::AsyncSeekExt;
10
11use tracing::trace;
12
13#[cfg(unix)]
14use crate::file_slice::AsyncFileSlice;
15
16use super::File;
17
18#[async_trait]
20pub trait AsyncFileExtension {
21 async fn reset_to_beginning(&mut self) -> Result<(), IoError>;
22
23 #[cfg(unix)]
24 fn raw_slice(&self, position: u64, len: u64) -> AsyncFileSlice;
25
26 #[cfg(unix)]
27 async fn as_slice(
28 &self,
29 position: u64,
30 desired_len_opt: Option<u64>,
31 ) -> Result<AsyncFileSlice, IoError>;
32}
33
34#[async_trait]
35impl AsyncFileExtension for File {
36 async fn reset_to_beginning(&mut self) -> Result<(), IoError> {
37 self.seek(SeekFrom::Start(0)).await.map(|_| ())
38 }
39
40 #[cfg(unix)]
42 fn raw_slice(&self, position: u64, len: u64) -> AsyncFileSlice {
43 AsyncFileSlice::new(self.as_raw_fd(), position, len)
44 }
45
46 #[cfg(unix)]
49 async fn as_slice(
50 &self,
51 position: u64,
52 desired_len_opt: Option<u64>,
53 ) -> Result<AsyncFileSlice, IoError> {
54 let metadata = self.metadata().await?;
55 let len = metadata.len();
56 if position >= len {
57 return Err(IoError::new(
58 ErrorKind::UnexpectedEof,
59 "position is greater than available len",
60 ));
61 }
62 let slice_len = if let Some(desired_len) = desired_len_opt {
63 if position + desired_len >= len {
64 return Err(IoError::new(
65 ErrorKind::UnexpectedEof,
66 "not available bytes",
67 ));
68 }
69 desired_len
70 } else {
71 len - position
72 };
73
74 trace!("file trace: position: {}, len: {}", position, len);
75
76 Ok(self.raw_slice(position, slice_len))
77 }
78}
79
80#[cfg(test)]
81mod tests {
82
83 use std::env::temp_dir;
84 use std::fs::File;
85 use std::io::Error as IoError;
86 use std::io::Read;
87 use std::io::Seek as _;
88 use std::io::SeekFrom;
89 use std::io::Write;
90
91 use flv_util::fixture::ensure_clean_file;
92
93 use super::AsyncFileExtension;
94 use crate::fs::util as file_util;
95 use crate::test_async;
96 use futures_lite::AsyncReadExt;
97 use futures_lite::AsyncSeekExt;
98 use futures_lite::AsyncWriteExt;
99
100 #[test]
103 fn test_sync_seek_write() -> Result<(), std::io::Error> {
104 let mut option = std::fs::OpenOptions::new();
105 option.read(true).write(true).create(true).append(false);
106 let test_file = temp_dir().join("x1");
107 let mut file = option.open(&test_file)?;
108 file.seek(SeekFrom::Start(0))?;
109 file.write_all(b"test")?;
110 file.sync_all()?;
112
113 let mut f2 = File::open(&test_file)?;
114 let mut contents = String::new();
115 f2.read_to_string(&mut contents)?;
116 assert_eq!(contents, "test");
117 Ok(())
118 }
119
120 #[test_async]
121 async fn file_multiple_overwrite() -> Result<(), IoError> {
122 let test_file_path = temp_dir().join("file_write_test");
123 ensure_clean_file(&test_file_path);
124
125 let mut file = file_util::create(&test_file_path).await?;
127 file.seek(SeekFrom::Start(0)).await?;
128 file.write_all(b"test").await?;
129 file.sync_all().await?;
130
131 let mut file = file_util::create(&test_file_path).await?;
133 file.seek(SeekFrom::Start(0)).await?;
134 file.write_all(b"xyzt").await?;
135 file.sync_all().await?;
136 let mut output = Vec::new();
137 let mut file = file_util::open(&test_file_path).await?;
138 file.read_to_end(&mut output).await?;
139 assert_eq!(output.len(), 4);
140 let contents = String::from_utf8(output).expect("conversion");
141 assert_eq!(contents, "xyzt");
142
143 Ok(())
144 }
145
146 #[test_async]
147 async fn async_file_write_read_same() -> Result<(), IoError> {
148 let test_file_path = temp_dir().join("read_write_test");
149 ensure_clean_file(&test_file_path);
150
151 let mut output = Vec::new();
152 let mut file = file_util::open_read_write(&test_file_path).await?;
153 file.write_all(b"test").await?;
154 file.seek(SeekFrom::Start(0)).await?;
155 file.read_to_end(&mut output).await?;
156 assert_eq!(output.len(), 4);
157 let contents = String::from_utf8(output).expect("conversion");
158 assert_eq!(contents, "test");
159
160 Ok(())
161 }
162
163 #[test_async]
164 async fn async_file_write_append_same() -> Result<(), IoError> {
165 let test_file_path = temp_dir().join("read_append_test");
166 ensure_clean_file(&test_file_path);
167
168 let mut output = Vec::new();
169 let mut file = file_util::open_read_append(&test_file_path).await?;
170 file.write_all(b"test").await?;
171 file.seek(SeekFrom::Start(0)).await?;
172 file.write_all(b"xyz").await?;
173 file.seek(SeekFrom::Start(0)).await?;
174 file.read_to_end(&mut output).await?;
175 assert_eq!(output.len(), 7);
176 let contents = String::from_utf8(output).expect("conversion");
177 assert_eq!(contents, "testxyz");
178
179 Ok(())
180 }
181
182 #[cfg(unix)]
183 #[test_async]
184 async fn test_as_slice() -> Result<(), IoError> {
185 let file = file_util::open("test-data/apirequest.bin").await?;
186 let f_slice = file.as_slice(0, None).await?;
187 assert_eq!(f_slice.position(), 0);
188 assert_eq!(f_slice.len(), 30);
189 Ok(())
190 }
191}