sea_streamer_file/
file.rs

1use std::{fmt::Display, os::unix::prelude::MetadataExt, str::FromStr, sync::Arc};
2
3use crate::{ByteBuffer, ByteSource, Bytes, FileErr};
4use sea_streamer_runtime::file::{
5    AsyncReadExt, AsyncSeekExt, AsyncWriteExt, File, OpenOptions, SeekFrom,
6};
7use sea_streamer_types::{
8    export::futures::{future::BoxFuture, FutureExt},
9    SeqPos, StreamUrlErr, StreamerUri,
10};
11
12pub(crate) const BUFFER_SIZE: usize = 10240;
13
14/// A simple buffered and bounded file reader.
15/// The implementation is much simpler than `FileSource`.
16///
17/// `FileReader` treats file as a fixed depot of bytes.
18/// Attempt to read beyond the end will result in a `NotEnoughBytes` error.
19pub struct FileReader {
20    file: AsyncFile,
21    /// This is the user's read offset, not the same as file's read pos
22    offset: u64,
23    buffer: ByteBuffer,
24}
25
26/// A minimal wrapper over async runtime's File.
27pub struct AsyncFile {
28    id: FileId,
29    file: File,
30    size: u64,
31    pos: u64,
32    buf: Vec<u8>,
33}
34
35pub type FileReaderFuture<'a> = BoxFuture<'a, Result<Bytes, FileErr>>;
36
37/// Basically a file path.
38#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
39pub struct FileId {
40    path: Arc<String>,
41}
42
43#[derive(Debug, Copy, Clone, PartialEq, Eq)]
44pub enum ReadFrom {
45    Beginning,
46    End,
47}
48
49impl FileReader {
50    pub async fn new(file_id: FileId) -> Result<Self, FileErr> {
51        let file = AsyncFile::new_r(file_id).await?;
52        Self::new_with(file, 0, ByteBuffer::new())
53    }
54
55    pub(crate) fn new_with(
56        file: AsyncFile,
57        offset: u64,
58        buffer: ByteBuffer,
59    ) -> Result<Self, FileErr> {
60        Ok(Self {
61            file,
62            offset,
63            buffer,
64        })
65    }
66
67    /// Seek the file stream to a different position.
68    /// SeqNo is regarded as byte offset.
69    /// Returns the file offset after sought.
70    pub async fn seek(&mut self, to: SeqPos) -> Result<u64, FileErr> {
71        self.offset = self.file.seek(to).await?;
72        self.buffer.clear();
73        Ok(self.offset)
74    }
75
76    #[inline]
77    pub fn offset(&self) -> u64 {
78        self.offset
79    }
80
81    #[inline]
82    pub fn file_size(&self) -> u64 {
83        self.file.size()
84    }
85
86    pub(crate) fn end(self) -> (AsyncFile, u64, ByteBuffer) {
87        (self.file, self.offset, self.buffer)
88    }
89
90    #[inline]
91    pub async fn resize(&mut self) -> Result<u64, FileErr> {
92        self.file.resize().await
93    }
94}
95
96impl ByteSource for FileReader {
97    type Future<'a> = FileReaderFuture<'a>;
98
99    /// Read N bytes from file. If there is not enough bytes, it will return `NotEnoughBytes` error.
100    ///
101    /// If there are enough bytes in the buffer, it yields immediately.
102    fn request_bytes(&mut self, size: usize) -> Self::Future<'_> {
103        async move {
104            if self.offset + size as u64 > self.file.size() {
105                return Err(FileErr::NotEnoughBytes);
106            }
107            loop {
108                if self.buffer.size() >= size {
109                    self.offset += size as u64;
110                    return Ok(self.buffer.consume(size));
111                }
112                let bytes = self.file.read().await?;
113                if bytes.is_empty() {
114                    return Err(FileErr::NotEnoughBytes);
115                }
116                self.buffer.append(bytes);
117            }
118        }
119        .boxed() // sadly, there is no way to name `ReadFuture`
120    }
121}
122
123impl AsyncFile {
124    /// Open a file for Read
125    pub async fn new_r(id: FileId) -> Result<Self, FileErr> {
126        log::debug!("AsyncFile Open ({}) Read", id.path());
127        let file = File::open(id.path()).await.map_err(FileErr::IoError)?;
128        Self::new_with(id, file).await
129    }
130
131    /// Creates a new file for Read/Write.
132    /// If the file already exsits, read from the beginning.
133    /// Seek to an appropriate position to append to this file.
134    pub async fn new_rw(id: FileId) -> Result<Self, FileErr> {
135        log::debug!("AsyncFile Open ({}) Read/Write", id.path());
136        let mut options = OpenOptions::new();
137        options.read(true).write(true).create(true);
138        let file = options.open(id.path()).await.map_err(FileErr::IoError)?;
139        Self::new_with(id, file).await
140    }
141
142    /// Creates a new file for Overwrite. If the file already exists, truncate it.
143    pub async fn new_ow(id: FileId) -> Result<Self, FileErr> {
144        log::debug!("AsyncFile Open ({}) Overwrite", id.path());
145        let mut options = OpenOptions::new();
146        options.write(true).create(true).truncate(true);
147        let file = options.open(id.path()).await.map_err(FileErr::IoError)?;
148        Self::new_with(id, file).await
149    }
150
151    /// Always create a new file. If the file already exists, abort.
152    pub async fn new_w(id: FileId) -> Result<Self, FileErr> {
153        log::debug!("AsyncFile Create ({})", id.path());
154        let mut options = OpenOptions::new();
155        options.write(true).create_new(true);
156        let file = options.open(id.path()).await.map_err(FileErr::IoError)?;
157        Self::new_with(id, file).await
158    }
159
160    async fn new_with(id: FileId, file: File) -> Result<Self, FileErr> {
161        let size = file_size_of(&file).await?;
162        let pos = 0;
163        let buf = vec![0u8; BUFFER_SIZE];
164        Ok(Self {
165            id,
166            file,
167            size,
168            pos,
169            buf,
170        })
171    }
172
173    /// Read up to `BUFFER_SIZE` amount of bytes.
174    pub async fn read(&mut self) -> Result<Bytes, FileErr> {
175        #[cfg(feature = "runtime-async-std")]
176        if self.pos >= self.size {
177            // Not sure why, there must be a subtle implementation difference.
178            // This is needed only on async-std, when the file grows.
179            self.file
180                .seek(SeekFrom::Start(self.pos))
181                .await
182                .map_err(FileErr::IoError)?;
183        }
184        let bytes_read = self
185            .file
186            .read(&mut self.buf)
187            .await
188            .map_err(FileErr::IoError)?;
189        let bytes = match bytes_read {
190            0 => Bytes::Empty,
191            1 => Bytes::Byte(self.buf[0]),
192            4 => Bytes::Word([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]),
193            _ => {
194                let bytes = self.buf[0..bytes_read].to_vec();
195                Bytes::Bytes(bytes)
196            }
197        };
198        self.pos += bytes_read as u64;
199        self.size = std::cmp::max(self.size, self.pos);
200        Ok(bytes)
201    }
202
203    #[inline]
204    pub async fn write_all(&mut self, bytes: &[u8]) -> Result<(), FileErr> {
205        self.file.write_all(bytes).await.map_err(FileErr::IoError)
206    }
207
208    #[inline]
209    pub async fn flush(&mut self) -> Result<(), FileErr> {
210        self.file.flush().await.map_err(FileErr::IoError)
211    }
212
213    #[inline]
214    pub async fn sync_all(&mut self) -> Result<(), FileErr> {
215        self.file.sync_all().await.map_err(FileErr::IoError)
216    }
217
218    /// Seek the file stream to a different position.
219    /// SeqNo is regarded as byte offset.
220    /// Returns the file position after sought.
221    pub async fn seek(&mut self, to: SeqPos) -> Result<u64, FileErr> {
222        self.pos = self
223            .file
224            .seek(match to {
225                SeqPos::Beginning => SeekFrom::Start(0),
226                SeqPos::End => SeekFrom::End(0),
227                SeqPos::At(to) => SeekFrom::Start(to),
228            })
229            .await
230            .map_err(FileErr::IoError)?;
231        self.size = std::cmp::max(self.size, self.pos);
232        Ok(self.pos)
233    }
234
235    /// Get the `FileId`.
236    #[inline]
237    pub fn id(&self) -> FileId {
238        self.id.clone()
239    }
240
241    /// Get the file's size. This updates only when the file is read or sought.
242    #[inline]
243    pub fn size(&self) -> u64 {
244        self.size
245    }
246
247    #[inline]
248    pub fn pos(&self) -> u64 {
249        self.pos
250    }
251
252    pub async fn resize(&mut self) -> Result<u64, FileErr> {
253        self.size = file_size_of(&self.file).await?;
254        Ok(self.size)
255    }
256}
257
258impl Drop for AsyncFile {
259    fn drop(&mut self) {
260        log::debug!("AsyncFile Close ({})", self.id.path());
261    }
262}
263
264impl FileId {
265    pub fn new<T: Into<String>>(path: T) -> Self {
266        Self {
267            path: Arc::new(path.into()),
268        }
269    }
270
271    pub fn path(&self) -> &str {
272        &self.path
273    }
274
275    pub fn to_streamer_uri(&self) -> Result<StreamerUri, StreamUrlErr> {
276        format!("file://{}", self.path()).parse()
277    }
278}
279
280impl Display for FileId {
281    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282        write!(f, "{}", self.path)
283    }
284}
285
286impl FromStr for FileId {
287    type Err = &'static str;
288
289    fn from_str(s: &str) -> Result<Self, Self::Err> {
290        Ok(Self::new(s.to_owned()))
291    }
292}
293
294async fn file_size_of(file: &File) -> Result<u64, FileErr> {
295    Ok(file.metadata().await.map_err(FileErr::IoError)?.size())
296}