1use core::future::Future;
4
5use std::{
6 io::{self, SeekFrom},
7 path::PathBuf,
8 time::SystemTime,
9};
10
11use bytes::BytesMut;
12
13pub trait AsyncFs {
15 type File: ChunkRead + Meta;
16 type OpenFuture: Future<Output = io::Result<Self::File>>;
17
18 fn open(&self, path: PathBuf) -> Self::OpenFuture;
20}
21
22pub trait Meta {
24 fn modified(&mut self) -> Option<SystemTime>;
26
27 fn len(&self) -> u64;
29
30 #[cold]
31 #[inline(never)]
32 fn is_empty(&self) -> bool {
33 self.len() == 0
34 }
35}
36
37pub trait ChunkRead: Sized {
39 type SeekFuture<'f>: Future<Output = io::Result<()>> + 'f
40 where
41 Self: 'f;
42
43 type Future: Future<Output = io::Result<Option<(Self, BytesMut, usize)>>>;
44
45 fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_>;
47
48 fn next(self, buf: BytesMut) -> Self::Future;
54}
55
56#[cfg(feature = "tokio")]
57pub(crate) use tokio_impl::TokioFs;
58
59#[cfg(feature = "tokio")]
60mod tokio_impl {
61 use core::{
62 pin::Pin,
63 task::{Context, Poll},
64 };
65
66 use tokio::{
67 fs::File,
68 io::{AsyncReadExt, AsyncSeekExt},
69 };
70
71 use super::*;
72
73 type BoxFuture<'a, T> = Pin<Box<dyn Future<Output = T> + Send + 'a>>;
74
75 pub struct OpenFuture<F> {
76 handle: tokio::task::JoinHandle<F>,
77 }
78
79 impl<F> Future for OpenFuture<F> {
80 type Output = F;
81
82 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
83 Pin::new(&mut self.get_mut().handle).poll(cx).map(|res| res.unwrap())
84 }
85 }
86
87 #[derive(Clone)]
88 pub struct TokioFs;
89
90 impl AsyncFs for TokioFs {
91 type File = TokioFile;
92 type OpenFuture = OpenFuture<io::Result<Self::File>>;
93
94 fn open(&self, path: PathBuf) -> Self::OpenFuture {
95 OpenFuture {
96 handle: tokio::task::spawn_blocking(move || {
97 let file = std::fs::File::open(path)?;
98 let meta = file.metadata()?;
99 let modified_time = meta.modified().ok();
100 let len = meta.len();
101 Ok(TokioFile {
102 file: file.into(),
103 modified_time,
104 len,
105 })
106 }),
107 }
108 }
109 }
110
111 pub struct TokioFile {
112 file: File,
113 modified_time: Option<SystemTime>,
114 len: u64,
115 }
116
117 impl Meta for TokioFile {
118 fn modified(&mut self) -> Option<SystemTime> {
119 self.modified_time
120 }
121
122 fn len(&self) -> u64 {
123 self.len
124 }
125 }
126
127 impl ChunkRead for TokioFile {
128 type SeekFuture<'f>
129 = BoxFuture<'f, io::Result<()>>
130 where
131 Self: 'f;
132
133 type Future = BoxFuture<'static, io::Result<Option<(Self, BytesMut, usize)>>>;
134
135 fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_> {
136 Box::pin(async move { self.file.seek(pos).await.map(|_| ()) })
137 }
138
139 fn next(mut self, mut buf: BytesMut) -> Self::Future {
140 Box::pin(async {
141 let n = self.file.read_buf(&mut buf).await?;
142 if n == 0 {
143 Ok(None)
144 } else {
145 Ok(Some((self, buf, n)))
146 }
147 })
148 }
149 }
150}
151
152#[cfg(feature = "tokio-uring")]
153pub(crate) use tokio_uring_impl::TokioUringFs;
154
155#[cfg(feature = "tokio-uring")]
156mod tokio_uring_impl {
157 use core::{
158 future::{ready, Ready},
159 pin::Pin,
160 };
161
162 use tokio_uring::fs::File;
163
164 use super::*;
165
166 type BoxFuture<'f, T> = Pin<Box<dyn Future<Output = T> + 'f>>;
167
168 #[derive(Clone)]
169 pub struct TokioUringFs;
170
171 impl AsyncFs for TokioUringFs {
172 type File = TokioUringFile;
173 type OpenFuture = BoxFuture<'static, io::Result<Self::File>>;
174
175 fn open(&self, path: PathBuf) -> Self::OpenFuture {
176 Box::pin(async {
177 let file = File::open(path).await?;
178
179 let meta = unsafe {
181 use std::os::fd::{AsRawFd, FromRawFd};
182
183 let file = std::fs::File::from_raw_fd(file.as_raw_fd());
184 let md = file.metadata();
185 core::mem::forget(file);
188 md?
189 };
190
191 let modified_time = meta.modified().ok();
192 let len = meta.len();
193
194 Ok(TokioUringFile {
195 file,
196 pos: 0,
197 modified_time,
198 len,
199 })
200 })
201 }
202 }
203
204 pub struct TokioUringFile {
205 file: File,
206 pos: u64,
207 modified_time: Option<SystemTime>,
208 len: u64,
209 }
210
211 impl Meta for TokioUringFile {
212 fn modified(&mut self) -> Option<SystemTime> {
213 self.modified_time
214 }
215
216 fn len(&self) -> u64 {
217 self.len
218 }
219 }
220
221 impl ChunkRead for TokioUringFile {
222 type SeekFuture<'f>
223 = Ready<io::Result<()>>
224 where
225 Self: 'f;
226
227 type Future = BoxFuture<'static, io::Result<Option<(Self, BytesMut, usize)>>>;
228
229 fn seek(&mut self, pos: SeekFrom) -> Self::SeekFuture<'_> {
230 let SeekFrom::Start(pos) = pos else {
231 unreachable!("ChunkRead::seek only accept pos as SeekFrom::Start variant")
232 };
233 self.pos += pos;
234 ready(Ok(()))
235 }
236
237 fn next(mut self, buf: BytesMut) -> Self::Future {
238 Box::pin(async {
239 let (res, buf) = self.file.read_at(buf, self.pos).await;
240 let n = res?;
241 if n == 0 {
242 Ok(None)
243 } else {
244 self.pos += n as u64;
245 Ok(Some((self, buf, n)))
246 }
247 })
248 }
249 }
250}