fluvio_future/fs/
bounded.rs

1use core::pin::Pin;
2use core::task::Context;
3use core::task::Poll;
4
5use std::io;
6use std::io::Error as IoError;
7use std::path::Path;
8use std::path::PathBuf;
9
10use std::fmt;
11
12use tracing::trace;
13
14use pin_utils::unsafe_pinned;
15use pin_utils::unsafe_unpinned;
16
17use async_fs::File;
18
19#[cfg(unix)]
20use crate::file_slice::AsyncFileSlice;
21
22use super::AsyncFileExtension;
23use futures_lite::AsyncWrite;
24
25#[derive(Debug)]
26pub enum BoundedFileSinkError {
27    IoError(io::Error),
28    MaxLenReached, // exceed max limit
29}
30
31impl fmt::Display for BoundedFileSinkError {
32    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
33        match self {
34            Self::IoError(err) => write!(f, "{err}"),
35            Self::MaxLenReached => write!(f, "max len reached"),
36        }
37    }
38}
39
40impl From<io::Error> for BoundedFileSinkError {
41    fn from(error: io::Error) -> Self {
42        BoundedFileSinkError::IoError(error)
43    }
44}
45
46#[derive(Default)]
47pub struct BoundedFileOption {
48    pub max_len: Option<u64>,
49}
50
51/// File Sink that tracks how much byte it has written
52/// This file will not block write operation.  It is up to writer to check if maximum file has size is reached
53/// since AsyncWrite return IoError
54pub struct BoundedFileSink {
55    option: BoundedFileOption,
56    current_len: u64,
57    writer: File,
58    path: PathBuf,
59}
60
61impl Unpin for BoundedFileSink {}
62
63impl BoundedFileSink {
64    unsafe_pinned!(writer: File);
65    unsafe_unpinned!(current_len: u64);
66
67    #[allow(unused)]
68    pub async fn create<P>(path: P, option: BoundedFileOption) -> Result<Self, io::Error>
69    where
70        P: AsRef<Path>,
71    {
72        let inner_path = path.as_ref();
73        let writer = File::create(inner_path).await?;
74        Ok(Self {
75            writer,
76            path: inner_path.to_owned(),
77            current_len: 0,
78            option,
79        })
80    }
81
82    #[allow(unused)]
83    pub async fn open_write<P>(path: P, option: BoundedFileOption) -> Result<Self, io::Error>
84    where
85        P: AsRef<Path>,
86    {
87        let file_path = path.as_ref();
88        let writer = File::open(file_path).await?;
89        let metadata = writer.metadata().await?;
90        let len = metadata.len();
91
92        Ok(Self {
93            writer,
94            path: file_path.to_owned(),
95            current_len: len,
96            option,
97        })
98    }
99
100    #[allow(unused)]
101    pub async fn open_append<P>(path: P, option: BoundedFileOption) -> Result<Self, io::Error>
102    where
103        P: AsRef<Path>,
104    {
105        let file_path = path.as_ref();
106
107        let writer = crate::fs::util::open_read_append(file_path).await?;
108        let metadata = writer.metadata().await?;
109        let len = metadata.len();
110
111        Ok(Self {
112            writer,
113            path: file_path.to_owned(),
114            current_len: len,
115            option,
116        })
117    }
118
119    #[allow(unused)]
120    pub fn get_current_len(&self) -> u64 {
121        self.current_len
122    }
123
124    /// check if buf_len can be written
125    pub fn can_be_appended(&self, buf_len: u64) -> bool {
126        match self.option.max_len {
127            Some(max_len) => self.current_len + buf_len <= max_len,
128            None => true,
129        }
130    }
131
132    pub fn get_path(&self) -> &Path {
133        &self.path
134    }
135
136    pub fn inner(&self) -> &File {
137        &self.writer
138    }
139
140    pub fn mut_inner(&mut self) -> &mut File {
141        &mut self.writer
142    }
143
144    #[cfg(unix)]
145    pub fn slice_from(&self, position: u64, len: u64) -> Result<AsyncFileSlice, IoError> {
146        Ok(self.writer.raw_slice(position, len))
147    }
148}
149
150impl AsyncWrite for BoundedFileSink {
151    fn poll_write(
152        mut self: Pin<&mut Self>,
153        cx: &mut Context<'_>,
154        buf: &[u8],
155    ) -> Poll<io::Result<usize>> {
156        match self.as_mut().writer().poll_write(cx, buf) {
157            Poll::Pending => Poll::Pending,
158            Poll::Ready(result) => match result {
159                Ok(size) => {
160                    let current_len = self.as_ref().current_len + size as u64;
161                    *(self.as_mut().current_len()) = current_len;
162                    trace!(
163                        "success write: {}, current len: {}",
164                        size,
165                        self.as_ref().current_len
166                    );
167                    Poll::Ready(Ok(size))
168                }
169                Err(err) => Poll::Ready(Err(err)),
170            },
171        }
172    }
173
174    fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
175        self.writer().poll_flush(cx)
176    }
177
178    fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
179        self.writer().poll_close(cx)
180    }
181}
182
183#[cfg(test)]
184mod tests {
185
186    use std::env::temp_dir;
187    use std::fs::File as StdFile;
188    use std::fs::remove_file;
189    use std::io::Read;
190    use std::io::SeekFrom;
191    use std::path::Path;
192
193    use tracing::debug;
194
195    use crate::test_async;
196    use futures_lite::AsyncReadExt;
197    use futures_lite::AsyncSeekExt;
198    use futures_lite::AsyncWriteExt;
199
200    use super::BoundedFileOption;
201    use super::BoundedFileSink;
202    use super::BoundedFileSinkError;
203
204    const TEST_FILE_NAME: &str = "file_test_01";
205    const MAX_TEST_FILE_NAME: &str = "file_test_max";
206
207    fn ensure_clean_file(log_path: &Path) {
208        debug!("removing log: {}", log_path.display());
209        // delete message log if it exists
210        if remove_file(log_path).is_ok() {
211            debug!("remove existing log file");
212        } else {
213            debug!("no existing log file");
214        }
215    }
216
217    #[test_async]
218    async fn test_sink_file_write_happy_path() -> Result<(), BoundedFileSinkError> {
219        let test_file = temp_dir().join(TEST_FILE_NAME);
220        ensure_clean_file(&test_file);
221
222        let mut f_sink = BoundedFileSink::create(&test_file, BoundedFileOption::default()).await?;
223
224        let bytes = vec![0x01, 0x02, 0x03];
225        f_sink.write_all(&bytes).await.expect("write bytes");
226        assert_eq!(f_sink.get_current_len(), 3);
227
228        f_sink.flush().await.expect("flush");
229
230        let test_file = temp_dir().join(TEST_FILE_NAME);
231        let mut f = StdFile::open(test_file)?;
232        let mut buffer = vec![0; 3];
233        f.read_exact(&mut buffer)?;
234        assert_eq!(buffer[0], 0x01);
235        assert_eq!(buffer[1], 0x02);
236        assert_eq!(buffer[2], 0x03);
237        Ok(())
238    }
239
240    const TEST_FILE_NAME2: &str = "file_test_02";
241
242    #[test_async]
243    async fn test_sink_file_write_multiple_path() -> Result<(), BoundedFileSinkError> {
244        let test_file = temp_dir().join(TEST_FILE_NAME2);
245        ensure_clean_file(&test_file);
246
247        let mut f_sink = BoundedFileSink::create(&test_file, BoundedFileOption::default())
248            .await
249            .expect("create");
250
251        let bytes = vec![0x1; 1000];
252        f_sink.write_all(&bytes).await.expect("first write");
253        f_sink.write_all(&bytes).await.expect("second write");
254
255        assert_eq!(f_sink.get_current_len(), 2000);
256        f_sink.flush().await.expect("flush");
257
258        let test_file = temp_dir().join(TEST_FILE_NAME2);
259        let mut f = StdFile::open(test_file).expect("test file should exists");
260        let mut buffer = vec![0; 2000];
261        let len = f.read(&mut buffer)?;
262        assert_eq!(len, 2000);
263        Ok(())
264    }
265
266    /// example of async test
267    #[test_async]
268    async fn test_sink_file_max_reached() -> Result<(), BoundedFileSinkError> {
269        let test_file = temp_dir().join(MAX_TEST_FILE_NAME);
270        ensure_clean_file(&test_file);
271
272        let option = BoundedFileOption { max_len: Some(10) };
273
274        let mut f_sink = BoundedFileSink::create(&test_file, option)
275            .await
276            .expect("file created");
277
278        let bytes = vec![0x01; 8];
279        f_sink.write_all(&bytes).await.expect("first write");
280        assert_eq!(f_sink.get_current_len(), 8);
281        assert!(!f_sink.can_be_appended(20));
282        Ok(())
283    }
284
285    #[test_async]
286    async fn test_sink_file_write_read() -> Result<(), BoundedFileSinkError> {
287        const WRITE_FILE: &str = "file_test_write_bounded";
288
289        let test_file = temp_dir().join(WRITE_FILE);
290        ensure_clean_file(&test_file);
291
292        let mut f_sink =
293            BoundedFileSink::open_append(&test_file, BoundedFileOption::default()).await?;
294
295        let bytes: Vec<u8> = vec![0x01; 73];
296        f_sink.write_all(&bytes).await.expect("send success");
297        debug!("current len: {}", f_sink.get_current_len());
298        let bytes: Vec<u8> = vec![0x01; 74];
299        f_sink.write_all(&bytes).await.expect("send success");
300        assert_eq!(f_sink.get_current_len(), 147);
301
302        // check if we read back
303        f_sink
304            .mut_inner()
305            .seek(SeekFrom::Start(0))
306            .await
307            .expect("reset to beginning");
308        // now read back
309        let mut read_buf: Vec<u8> = vec![];
310        f_sink
311            .mut_inner()
312            .read_to_end(&mut read_buf)
313            .await
314            .expect("read");
315        assert_eq!(read_buf.len(), 147);
316
317        Ok(())
318    }
319
320    mod inner {
321
322        use std::io::Error as IoError;
323        use std::io::Write;
324
325        use crate::test_async;
326
327        use super::ensure_clean_file;
328        use super::temp_dir;
329
330        #[test_async]
331        async fn test_sink_file_write_std() -> Result<(), IoError> {
332            use std::fs::File;
333
334            const WRITE_FILE: &str = "file_test_two_write_std";
335
336            let test_file = temp_dir().join(WRITE_FILE);
337            ensure_clean_file(&test_file);
338            let mut f_sink = File::create(&test_file).expect("file created");
339
340            let bytes: Vec<u8> = vec![0x01; 73];
341            f_sink.write_all(&bytes).expect("send success");
342            let bytes: Vec<u8> = vec![0x01; 74];
343            f_sink.write_all(&bytes).expect("send success");
344
345            let metadata = std::fs::metadata(test_file).expect("data file should exists");
346
347            // even if file is not flushed, file has all the data
348            assert_eq!(metadata.len(), 147);
349
350            Ok(())
351        }
352    }
353}