fluvio_future/fs/
bounded.rs1use core::pin::Pin;
2use core::task::Context;
3use core::task::Poll;
4
5use std::io;
6use std::io::Error as IoError;
7use std::path::Path;
8use std::path::PathBuf;
9
10use std::fmt;
11
12use tracing::trace;
13
14use pin_utils::unsafe_pinned;
15use pin_utils::unsafe_unpinned;
16
17use async_fs::File;
18
19#[cfg(unix)]
20use crate::file_slice::AsyncFileSlice;
21
22use super::AsyncFileExtension;
23use futures_lite::AsyncWrite;
24
25#[derive(Debug)]
26pub enum BoundedFileSinkError {
27 IoError(io::Error),
28 MaxLenReached, }
30
31impl fmt::Display for BoundedFileSinkError {
32 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
33 match self {
34 Self::IoError(err) => write!(f, "{err}"),
35 Self::MaxLenReached => write!(f, "max len reached"),
36 }
37 }
38}
39
40impl From<io::Error> for BoundedFileSinkError {
41 fn from(error: io::Error) -> Self {
42 BoundedFileSinkError::IoError(error)
43 }
44}
45
46#[derive(Default)]
47pub struct BoundedFileOption {
48 pub max_len: Option<u64>,
49}
50
51pub struct BoundedFileSink {
55 option: BoundedFileOption,
56 current_len: u64,
57 writer: File,
58 path: PathBuf,
59}
60
61impl Unpin for BoundedFileSink {}
62
63impl BoundedFileSink {
64 unsafe_pinned!(writer: File);
65 unsafe_unpinned!(current_len: u64);
66
67 #[allow(unused)]
68 pub async fn create<P>(path: P, option: BoundedFileOption) -> Result<Self, io::Error>
69 where
70 P: AsRef<Path>,
71 {
72 let inner_path = path.as_ref();
73 let writer = File::create(inner_path).await?;
74 Ok(Self {
75 writer,
76 path: inner_path.to_owned(),
77 current_len: 0,
78 option,
79 })
80 }
81
82 #[allow(unused)]
83 pub async fn open_write<P>(path: P, option: BoundedFileOption) -> Result<Self, io::Error>
84 where
85 P: AsRef<Path>,
86 {
87 let file_path = path.as_ref();
88 let writer = File::open(file_path).await?;
89 let metadata = writer.metadata().await?;
90 let len = metadata.len();
91
92 Ok(Self {
93 writer,
94 path: file_path.to_owned(),
95 current_len: len,
96 option,
97 })
98 }
99
100 #[allow(unused)]
101 pub async fn open_append<P>(path: P, option: BoundedFileOption) -> Result<Self, io::Error>
102 where
103 P: AsRef<Path>,
104 {
105 let file_path = path.as_ref();
106
107 let writer = crate::fs::util::open_read_append(file_path).await?;
108 let metadata = writer.metadata().await?;
109 let len = metadata.len();
110
111 Ok(Self {
112 writer,
113 path: file_path.to_owned(),
114 current_len: len,
115 option,
116 })
117 }
118
119 #[allow(unused)]
120 pub fn get_current_len(&self) -> u64 {
121 self.current_len
122 }
123
124 pub fn can_be_appended(&self, buf_len: u64) -> bool {
126 match self.option.max_len {
127 Some(max_len) => self.current_len + buf_len <= max_len,
128 None => true,
129 }
130 }
131
132 pub fn get_path(&self) -> &Path {
133 &self.path
134 }
135
136 pub fn inner(&self) -> &File {
137 &self.writer
138 }
139
140 pub fn mut_inner(&mut self) -> &mut File {
141 &mut self.writer
142 }
143
144 #[cfg(unix)]
145 pub fn slice_from(&self, position: u64, len: u64) -> Result<AsyncFileSlice, IoError> {
146 Ok(self.writer.raw_slice(position, len))
147 }
148}
149
150impl AsyncWrite for BoundedFileSink {
151 fn poll_write(
152 mut self: Pin<&mut Self>,
153 cx: &mut Context<'_>,
154 buf: &[u8],
155 ) -> Poll<io::Result<usize>> {
156 match self.as_mut().writer().poll_write(cx, buf) {
157 Poll::Pending => Poll::Pending,
158 Poll::Ready(result) => match result {
159 Ok(size) => {
160 let current_len = self.as_ref().current_len + size as u64;
161 *(self.as_mut().current_len()) = current_len;
162 trace!(
163 "success write: {}, current len: {}",
164 size,
165 self.as_ref().current_len
166 );
167 Poll::Ready(Ok(size))
168 }
169 Err(err) => Poll::Ready(Err(err)),
170 },
171 }
172 }
173
174 fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
175 self.writer().poll_flush(cx)
176 }
177
178 fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
179 self.writer().poll_close(cx)
180 }
181}
182
183#[cfg(test)]
184mod tests {
185
186 use std::env::temp_dir;
187 use std::fs::File as StdFile;
188 use std::fs::remove_file;
189 use std::io::Read;
190 use std::io::SeekFrom;
191 use std::path::Path;
192
193 use tracing::debug;
194
195 use crate::test_async;
196 use futures_lite::AsyncReadExt;
197 use futures_lite::AsyncSeekExt;
198 use futures_lite::AsyncWriteExt;
199
200 use super::BoundedFileOption;
201 use super::BoundedFileSink;
202 use super::BoundedFileSinkError;
203
204 const TEST_FILE_NAME: &str = "file_test_01";
205 const MAX_TEST_FILE_NAME: &str = "file_test_max";
206
207 fn ensure_clean_file(log_path: &Path) {
208 debug!("removing log: {}", log_path.display());
209 if remove_file(log_path).is_ok() {
211 debug!("remove existing log file");
212 } else {
213 debug!("no existing log file");
214 }
215 }
216
217 #[test_async]
218 async fn test_sink_file_write_happy_path() -> Result<(), BoundedFileSinkError> {
219 let test_file = temp_dir().join(TEST_FILE_NAME);
220 ensure_clean_file(&test_file);
221
222 let mut f_sink = BoundedFileSink::create(&test_file, BoundedFileOption::default()).await?;
223
224 let bytes = vec![0x01, 0x02, 0x03];
225 f_sink.write_all(&bytes).await.expect("write bytes");
226 assert_eq!(f_sink.get_current_len(), 3);
227
228 f_sink.flush().await.expect("flush");
229
230 let test_file = temp_dir().join(TEST_FILE_NAME);
231 let mut f = StdFile::open(test_file)?;
232 let mut buffer = vec![0; 3];
233 f.read_exact(&mut buffer)?;
234 assert_eq!(buffer[0], 0x01);
235 assert_eq!(buffer[1], 0x02);
236 assert_eq!(buffer[2], 0x03);
237 Ok(())
238 }
239
240 const TEST_FILE_NAME2: &str = "file_test_02";
241
242 #[test_async]
243 async fn test_sink_file_write_multiple_path() -> Result<(), BoundedFileSinkError> {
244 let test_file = temp_dir().join(TEST_FILE_NAME2);
245 ensure_clean_file(&test_file);
246
247 let mut f_sink = BoundedFileSink::create(&test_file, BoundedFileOption::default())
248 .await
249 .expect("create");
250
251 let bytes = vec![0x1; 1000];
252 f_sink.write_all(&bytes).await.expect("first write");
253 f_sink.write_all(&bytes).await.expect("second write");
254
255 assert_eq!(f_sink.get_current_len(), 2000);
256 f_sink.flush().await.expect("flush");
257
258 let test_file = temp_dir().join(TEST_FILE_NAME2);
259 let mut f = StdFile::open(test_file).expect("test file should exists");
260 let mut buffer = vec![0; 2000];
261 let len = f.read(&mut buffer)?;
262 assert_eq!(len, 2000);
263 Ok(())
264 }
265
266 #[test_async]
268 async fn test_sink_file_max_reached() -> Result<(), BoundedFileSinkError> {
269 let test_file = temp_dir().join(MAX_TEST_FILE_NAME);
270 ensure_clean_file(&test_file);
271
272 let option = BoundedFileOption { max_len: Some(10) };
273
274 let mut f_sink = BoundedFileSink::create(&test_file, option)
275 .await
276 .expect("file created");
277
278 let bytes = vec![0x01; 8];
279 f_sink.write_all(&bytes).await.expect("first write");
280 assert_eq!(f_sink.get_current_len(), 8);
281 assert!(!f_sink.can_be_appended(20));
282 Ok(())
283 }
284
285 #[test_async]
286 async fn test_sink_file_write_read() -> Result<(), BoundedFileSinkError> {
287 const WRITE_FILE: &str = "file_test_write_bounded";
288
289 let test_file = temp_dir().join(WRITE_FILE);
290 ensure_clean_file(&test_file);
291
292 let mut f_sink =
293 BoundedFileSink::open_append(&test_file, BoundedFileOption::default()).await?;
294
295 let bytes: Vec<u8> = vec![0x01; 73];
296 f_sink.write_all(&bytes).await.expect("send success");
297 debug!("current len: {}", f_sink.get_current_len());
298 let bytes: Vec<u8> = vec![0x01; 74];
299 f_sink.write_all(&bytes).await.expect("send success");
300 assert_eq!(f_sink.get_current_len(), 147);
301
302 f_sink
304 .mut_inner()
305 .seek(SeekFrom::Start(0))
306 .await
307 .expect("reset to beginning");
308 let mut read_buf: Vec<u8> = vec![];
310 f_sink
311 .mut_inner()
312 .read_to_end(&mut read_buf)
313 .await
314 .expect("read");
315 assert_eq!(read_buf.len(), 147);
316
317 Ok(())
318 }
319
320 mod inner {
321
322 use std::io::Error as IoError;
323 use std::io::Write;
324
325 use crate::test_async;
326
327 use super::ensure_clean_file;
328 use super::temp_dir;
329
330 #[test_async]
331 async fn test_sink_file_write_std() -> Result<(), IoError> {
332 use std::fs::File;
333
334 const WRITE_FILE: &str = "file_test_two_write_std";
335
336 let test_file = temp_dir().join(WRITE_FILE);
337 ensure_clean_file(&test_file);
338 let mut f_sink = File::create(&test_file).expect("file created");
339
340 let bytes: Vec<u8> = vec![0x01; 73];
341 f_sink.write_all(&bytes).expect("send success");
342 let bytes: Vec<u8> = vec![0x01; 74];
343 f_sink.write_all(&bytes).expect("send success");
344
345 let metadata = std::fs::metadata(test_file).expect("data file should exists");
346
347 assert_eq!(metadata.len(), 147);
349
350 Ok(())
351 }
352 }
353}