1use std::{
3 future::Future,
4 io::{self, Read, Seek, SeekFrom},
5 path::PathBuf,
6 pin::Pin,
7 task::{Context, Poll},
8};
9
10use bytes::Bytes;
11use pin_project::pin_project;
12use tokio::{
13 io::{AsyncReadExt, AsyncWrite},
14 task::{spawn_blocking, JoinHandle},
15};
16
17use super::{make_io_error, AsyncSliceReader, AsyncSliceWriter, AsyncStreamWriter};
18use crate::AsyncStreamReader;
19
20const MAX_PREALLOC: usize = 1024 * 16;
21
22#[derive(Debug)]
24pub struct File(Option<FileAdapterFsm>);
25
26impl File {
27 pub async fn create(
29 create_file: impl FnOnce() -> io::Result<std::fs::File> + Send + 'static,
30 ) -> io::Result<Self> {
31 let inner = spawn_blocking(create_file).await.map_err(make_io_error)??;
32 Ok(Self::from_std(inner))
33 }
34
35 pub fn from_std(file: std::fs::File) -> Self {
40 Self(Some(FileAdapterFsm(file)))
41 }
42
43 pub async fn open(path: PathBuf) -> io::Result<Self> {
45 Self::create(move || std::fs::File::open(path)).await
46 }
47
48 #[cfg(test)]
50 pub fn read_contents(&self) -> Vec<u8> {
51 let mut std_file = &self.0.as_ref().unwrap().0;
52 let mut t = Vec::new();
53 std_file.rewind().unwrap();
57 std_file.read_to_end(&mut t).unwrap();
58 t
59 }
60}
61
62pub mod file {
64 use super::*;
65
66 impl AsyncSliceReader for File {
67 async fn read_at(&mut self, offset: u64, len: usize) -> io::Result<Bytes> {
68 Asyncify::from(self.0.take().map(|t| (t.read_at(offset, len), &mut self.0))).await
69 }
70
71 async fn size(&mut self) -> io::Result<u64> {
72 Asyncify::from(self.0.take().map(|t| (t.len(), &mut self.0))).await
73 }
74 }
75
76 impl AsyncSliceWriter for File {
77 async fn write_bytes_at(&mut self, offset: u64, data: Bytes) -> io::Result<()> {
78 Asyncify::from(
79 self.0
80 .take()
81 .map(|t| (t.write_bytes_at(offset, data), &mut self.0)),
82 )
83 .await
84 }
85
86 async fn write_at(&mut self, offset: u64, data: &[u8]) -> io::Result<()> {
87 Asyncify::from(
88 self.0
89 .take()
90 .map(|t| (t.write_at(offset, data), &mut self.0)),
91 )
92 .await
93 }
94
95 async fn sync(&mut self) -> io::Result<()> {
96 Asyncify::from(self.0.take().map(|t| (t.sync(), &mut self.0))).await
97 }
98
99 async fn set_len(&mut self, len: u64) -> io::Result<()> {
100 Asyncify::from(self.0.take().map(|t| (t.set_len(len), &mut self.0))).await
101 }
102 }
103}
104
105#[derive(Debug)]
108#[pin_project(project = AsyncifyProj)]
109enum Asyncify<'a, R, T> {
110 Ok(
112 #[pin] tokio::task::JoinHandle<(T, io::Result<R>)>,
113 &'a mut Option<T>,
114 ),
115 BusyErr,
117}
118
119impl<'a, R, T> From<Option<(JoinHandle<(T, io::Result<R>)>, &'a mut Option<T>)>>
120 for Asyncify<'a, R, T>
121{
122 fn from(value: Option<(JoinHandle<(T, io::Result<R>)>, &'a mut Option<T>)>) -> Self {
123 match value {
124 Some((f, h)) => Self::Ok(f, h),
125 None => Self::BusyErr,
126 }
127 }
128}
129
130impl<'a, T: 'a, R> Future for Asyncify<'a, R, T> {
131 type Output = io::Result<R>;
132 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
133 match self.project() {
134 AsyncifyProj::Ok(f, h) => f.poll(cx).map(|x| {
135 match x {
136 Ok((state, r)) => {
137 **h = Some(state);
139 r
140 }
141 Err(e) => Err(io::Error::other(e)),
142 }
143 }),
144 AsyncifyProj::BusyErr => Poll::Ready(io::Result::Err(io::Error::other(
145 "previous io op not polled to completion",
146 ))),
147 }
148 }
149}
150
151#[derive(Debug)]
156struct FileAdapterFsm(std::fs::File);
157
158impl FileAdapterFsm {
159 fn read_at(mut self, offset: u64, len: usize) -> JoinHandle<(Self, io::Result<Bytes>)> {
160 fn inner<R: std::io::Read + std::io::Seek>(
161 this: &mut R,
162 offset: u64,
163 len: usize,
164 buf: &mut Vec<u8>,
165 ) -> io::Result<()> {
166 this.seek(SeekFrom::Start(offset))?;
167 this.take(len as u64).read_to_end(buf)?;
168 Ok(())
169 }
170 spawn_blocking(move || {
171 let mut buf = Vec::with_capacity(len.min(MAX_PREALLOC));
174 let res = inner(&mut self.0, offset, len, &mut buf);
175 (self, res.map(|_| buf.into()))
176 })
177 }
178
179 fn len(mut self) -> JoinHandle<(Self, io::Result<u64>)> {
180 spawn_blocking(move || {
181 let res = self.0.seek(SeekFrom::End(0));
182 (self, res)
183 })
184 }
185}
186
187impl FileAdapterFsm {
188 fn write_bytes_at(mut self, offset: u64, data: Bytes) -> JoinHandle<(Self, io::Result<()>)> {
189 fn inner<W: std::io::Write + std::io::Seek>(
190 this: &mut W,
191 offset: u64,
192 buf: &[u8],
193 ) -> io::Result<()> {
194 this.seek(SeekFrom::Start(offset))?;
195 this.write_all(buf)?;
196 Ok(())
197 }
198 spawn_blocking(move || {
199 let res = inner(&mut self.0, offset, &data);
200 (self, res)
201 })
202 }
203
204 fn write_at(mut self, offset: u64, bytes: &[u8]) -> JoinHandle<(Self, io::Result<()>)> {
205 fn inner<W: std::io::Write + std::io::Seek>(
206 this: &mut W,
207 offset: u64,
208 buf: smallvec::SmallVec<[u8; 16]>,
209 ) -> io::Result<()> {
210 this.seek(SeekFrom::Start(offset))?;
211 this.write_all(&buf)?;
212 Ok(())
213 }
214 let t: smallvec::SmallVec<[u8; 16]> = bytes.into();
215 spawn_blocking(move || {
216 let res = inner(&mut self.0, offset, t);
217 (self, res)
218 })
219 }
220
221 fn set_len(self, len: u64) -> JoinHandle<(Self, io::Result<()>)> {
222 spawn_blocking(move || {
223 let res = self.0.set_len(len);
224 (self, res)
225 })
226 }
227
228 fn sync(self) -> JoinHandle<(Self, io::Result<()>)> {
229 spawn_blocking(move || {
230 let res = self.0.sync_all();
231 (self, res)
232 })
233 }
234}
235
236#[derive(Debug)]
238pub struct ConcatenateSliceWriter<W>(W);
239
240impl<W> ConcatenateSliceWriter<W> {
241 pub fn new(inner: W) -> Self {
243 Self(inner)
244 }
245
246 pub fn into_inner(self) -> W {
248 self.0
249 }
250}
251
252impl<W: AsyncWrite + Unpin + 'static> AsyncSliceWriter for ConcatenateSliceWriter<W> {
253 async fn write_bytes_at(&mut self, _offset: u64, data: Bytes) -> io::Result<()> {
254 self.0.write_all(&data).await
255 }
256
257 async fn write_at(&mut self, _offset: u64, bytes: &[u8]) -> io::Result<()> {
258 self.0.write_all(bytes).await
259 }
260
261 async fn sync(&mut self) -> io::Result<()> {
262 self.0.flush().await
263 }
264
265 async fn set_len(&mut self, _len: u64) -> io::Result<()> {
266 io::Result::Ok(())
267 }
268}
269
270#[derive(Debug, Clone)]
272pub struct TokioStreamWriter<T>(pub T);
273
274use tokio::io::AsyncWriteExt;
275
276impl<T: tokio::io::AsyncWrite + Unpin> AsyncStreamWriter for TokioStreamWriter<T> {
277 async fn write(&mut self, data: &[u8]) -> io::Result<()> {
278 self.0.write_all(data).await
279 }
280
281 async fn write_bytes(&mut self, data: Bytes) -> io::Result<()> {
282 self.0.write_all(&data).await
283 }
284
285 async fn sync(&mut self) -> io::Result<()> {
286 self.0.flush().await
287 }
288}
289
290#[derive(Debug, Clone)]
292pub struct TokioStreamReader<T>(pub T);
293
294impl<T> TokioStreamReader<T> {
295 pub fn new(inner: T) -> Self {
297 Self(inner)
298 }
299
300 pub fn into_inner(self) -> T {
302 self.0
303 }
304}
305
306impl<T: tokio::io::AsyncRead + Unpin> AsyncStreamReader for TokioStreamReader<T> {
307 async fn read_bytes(&mut self, len: usize) -> io::Result<Bytes> {
308 let mut buf = Vec::with_capacity(len.min(MAX_PREALLOC));
309 (&mut self.0).take(len as u64).read_to_end(&mut buf).await?;
310 Ok(buf.into())
311 }
312
313 async fn read<const L: usize>(&mut self) -> io::Result<[u8; L]> {
314 let mut buf = [0; L];
315 self.0.read_exact(&mut buf).await?;
316 Ok(buf)
317 }
318}