1use std::{fmt::Display, os::unix::prelude::MetadataExt, str::FromStr, sync::Arc};
2
3use crate::{ByteBuffer, ByteSource, Bytes, FileErr};
4use sea_streamer_runtime::file::{
5 AsyncReadExt, AsyncSeekExt, AsyncWriteExt, File, OpenOptions, SeekFrom,
6};
7use sea_streamer_types::{
8 export::futures::{future::BoxFuture, FutureExt},
9 SeqPos, StreamUrlErr, StreamerUri,
10};
11
12pub(crate) const BUFFER_SIZE: usize = 10240;
13
14pub struct FileReader {
20 file: AsyncFile,
21 offset: u64,
23 buffer: ByteBuffer,
24}
25
26pub struct AsyncFile {
28 id: FileId,
29 file: File,
30 size: u64,
31 pos: u64,
32 buf: Vec<u8>,
33}
34
35pub type FileReaderFuture<'a> = BoxFuture<'a, Result<Bytes, FileErr>>;
36
37#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
39pub struct FileId {
40 path: Arc<String>,
41}
42
43#[derive(Debug, Copy, Clone, PartialEq, Eq)]
44pub enum ReadFrom {
45 Beginning,
46 End,
47}
48
49impl FileReader {
50 pub async fn new(file_id: FileId) -> Result<Self, FileErr> {
51 let file = AsyncFile::new_r(file_id).await?;
52 Self::new_with(file, 0, ByteBuffer::new())
53 }
54
55 pub(crate) fn new_with(
56 file: AsyncFile,
57 offset: u64,
58 buffer: ByteBuffer,
59 ) -> Result<Self, FileErr> {
60 Ok(Self {
61 file,
62 offset,
63 buffer,
64 })
65 }
66
67 pub async fn seek(&mut self, to: SeqPos) -> Result<u64, FileErr> {
71 self.offset = self.file.seek(to).await?;
72 self.buffer.clear();
73 Ok(self.offset)
74 }
75
76 #[inline]
77 pub fn offset(&self) -> u64 {
78 self.offset
79 }
80
81 #[inline]
82 pub fn file_size(&self) -> u64 {
83 self.file.size()
84 }
85
86 pub(crate) fn end(self) -> (AsyncFile, u64, ByteBuffer) {
87 (self.file, self.offset, self.buffer)
88 }
89
90 #[inline]
91 pub async fn resize(&mut self) -> Result<u64, FileErr> {
92 self.file.resize().await
93 }
94}
95
96impl ByteSource for FileReader {
97 type Future<'a> = FileReaderFuture<'a>;
98
99 fn request_bytes(&mut self, size: usize) -> Self::Future<'_> {
103 async move {
104 if self.offset + size as u64 > self.file.size() {
105 return Err(FileErr::NotEnoughBytes);
106 }
107 loop {
108 if self.buffer.size() >= size {
109 self.offset += size as u64;
110 return Ok(self.buffer.consume(size));
111 }
112 let bytes = self.file.read().await?;
113 if bytes.is_empty() {
114 return Err(FileErr::NotEnoughBytes);
115 }
116 self.buffer.append(bytes);
117 }
118 }
119 .boxed() }
121}
122
123impl AsyncFile {
124 pub async fn new_r(id: FileId) -> Result<Self, FileErr> {
126 log::debug!("AsyncFile Open ({}) Read", id.path());
127 let file = File::open(id.path()).await.map_err(FileErr::IoError)?;
128 Self::new_with(id, file).await
129 }
130
131 pub async fn new_rw(id: FileId) -> Result<Self, FileErr> {
135 log::debug!("AsyncFile Open ({}) Read/Write", id.path());
136 let mut options = OpenOptions::new();
137 options.read(true).write(true).create(true);
138 let file = options.open(id.path()).await.map_err(FileErr::IoError)?;
139 Self::new_with(id, file).await
140 }
141
142 pub async fn new_ow(id: FileId) -> Result<Self, FileErr> {
144 log::debug!("AsyncFile Open ({}) Overwrite", id.path());
145 let mut options = OpenOptions::new();
146 options.write(true).create(true).truncate(true);
147 let file = options.open(id.path()).await.map_err(FileErr::IoError)?;
148 Self::new_with(id, file).await
149 }
150
151 pub async fn new_w(id: FileId) -> Result<Self, FileErr> {
153 log::debug!("AsyncFile Create ({})", id.path());
154 let mut options = OpenOptions::new();
155 options.write(true).create_new(true);
156 let file = options.open(id.path()).await.map_err(FileErr::IoError)?;
157 Self::new_with(id, file).await
158 }
159
160 async fn new_with(id: FileId, file: File) -> Result<Self, FileErr> {
161 let size = file_size_of(&file).await?;
162 let pos = 0;
163 let buf = vec![0u8; BUFFER_SIZE];
164 Ok(Self {
165 id,
166 file,
167 size,
168 pos,
169 buf,
170 })
171 }
172
173 pub async fn read(&mut self) -> Result<Bytes, FileErr> {
175 #[cfg(feature = "runtime-async-std")]
176 if self.pos >= self.size {
177 self.file
180 .seek(SeekFrom::Start(self.pos))
181 .await
182 .map_err(FileErr::IoError)?;
183 }
184 let bytes_read = self
185 .file
186 .read(&mut self.buf)
187 .await
188 .map_err(FileErr::IoError)?;
189 let bytes = match bytes_read {
190 0 => Bytes::Empty,
191 1 => Bytes::Byte(self.buf[0]),
192 4 => Bytes::Word([self.buf[0], self.buf[1], self.buf[2], self.buf[3]]),
193 _ => {
194 let bytes = self.buf[0..bytes_read].to_vec();
195 Bytes::Bytes(bytes)
196 }
197 };
198 self.pos += bytes_read as u64;
199 self.size = std::cmp::max(self.size, self.pos);
200 Ok(bytes)
201 }
202
203 #[inline]
204 pub async fn write_all(&mut self, bytes: &[u8]) -> Result<(), FileErr> {
205 self.file.write_all(bytes).await.map_err(FileErr::IoError)
206 }
207
208 #[inline]
209 pub async fn flush(&mut self) -> Result<(), FileErr> {
210 self.file.flush().await.map_err(FileErr::IoError)
211 }
212
213 #[inline]
214 pub async fn sync_all(&mut self) -> Result<(), FileErr> {
215 self.file.sync_all().await.map_err(FileErr::IoError)
216 }
217
218 pub async fn seek(&mut self, to: SeqPos) -> Result<u64, FileErr> {
222 self.pos = self
223 .file
224 .seek(match to {
225 SeqPos::Beginning => SeekFrom::Start(0),
226 SeqPos::End => SeekFrom::End(0),
227 SeqPos::At(to) => SeekFrom::Start(to),
228 })
229 .await
230 .map_err(FileErr::IoError)?;
231 self.size = std::cmp::max(self.size, self.pos);
232 Ok(self.pos)
233 }
234
235 #[inline]
237 pub fn id(&self) -> FileId {
238 self.id.clone()
239 }
240
241 #[inline]
243 pub fn size(&self) -> u64 {
244 self.size
245 }
246
247 #[inline]
248 pub fn pos(&self) -> u64 {
249 self.pos
250 }
251
252 pub async fn resize(&mut self) -> Result<u64, FileErr> {
253 self.size = file_size_of(&self.file).await?;
254 Ok(self.size)
255 }
256}
257
258impl Drop for AsyncFile {
259 fn drop(&mut self) {
260 log::debug!("AsyncFile Close ({})", self.id.path());
261 }
262}
263
264impl FileId {
265 pub fn new<T: Into<String>>(path: T) -> Self {
266 Self {
267 path: Arc::new(path.into()),
268 }
269 }
270
271 pub fn path(&self) -> &str {
272 &self.path
273 }
274
275 pub fn to_streamer_uri(&self) -> Result<StreamerUri, StreamUrlErr> {
276 format!("file://{}", self.path()).parse()
277 }
278}
279
280impl Display for FileId {
281 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
282 write!(f, "{}", self.path)
283 }
284}
285
286impl FromStr for FileId {
287 type Err = &'static str;
288
289 fn from_str(s: &str) -> Result<Self, Self::Err> {
290 Ok(Self::new(s.to_owned()))
291 }
292}
293
294async fn file_size_of(file: &File) -> Result<u64, FileErr> {
295 Ok(file.metadata().await.map_err(FileErr::IoError)?.size())
296}