1use off64::chrono::Off64AsyncReadChrono;
2use off64::chrono::Off64AsyncWriteChrono;
3use off64::chrono::Off64ReadChrono;
4use off64::chrono::Off64WriteChrono;
5use off64::int::Off64AsyncReadInt;
6use off64::int::Off64AsyncWriteInt;
7use off64::int::Off64ReadInt;
8use off64::int::Off64WriteInt;
9use off64::usz;
10use off64::Off64AsyncRead;
11use off64::Off64AsyncWrite;
12use off64::Off64Read;
13use off64::Off64Write;
14use signal_future::SignalFuture;
15use signal_future::SignalFutureController;
16use std::io::SeekFrom;
17#[cfg(feature = "tokio_file")]
18use std::os::unix::prelude::FileExt;
19use std::path::Path;
20use std::sync::atomic::AtomicU64;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::fs::File;
25use tokio::fs::OpenOptions;
26use tokio::io;
27use tokio::io::AsyncSeekExt;
28use tokio::sync::Mutex;
29use tokio::task::spawn_blocking;
30use tokio::time::sleep;
31use tokio::time::Instant;
32
33pub async fn get_file_len_via_seek(path: &Path) -> io::Result<u64> {
34 let mut file = File::open(path).await?;
35 file.seek(SeekFrom::End(0)).await
37}
38
39fn dur_us(dur: Instant) -> u64 {
40 dur.elapsed().as_micros().try_into().unwrap()
41}
42
43pub struct WriteRequest<D: AsRef<[u8]> + Send + 'static> {
45 data: D,
46 offset: u64,
47}
48
49impl<D: AsRef<[u8]> + Send + 'static> WriteRequest<D> {
50 pub fn new(offset: u64, data: D) -> Self {
51 Self { data, offset }
52 }
53}
54
55struct PendingSyncState {
56 earliest_unsynced: Option<Instant>, latest_unsynced: Option<Instant>,
58 pending_sync_fut_states: Vec<SignalFutureController>,
59}
60
61#[derive(Default, Debug)]
65pub struct SeekableAsyncFileMetrics {
66 sync_background_loops_counter: AtomicU64,
67 sync_counter: AtomicU64,
68 sync_delayed_counter: AtomicU64,
69 sync_longest_delay_us_counter: AtomicU64,
70 sync_shortest_delay_us_counter: AtomicU64,
71 sync_us_counter: AtomicU64,
72 write_bytes_counter: AtomicU64,
73 write_counter: AtomicU64,
74 write_us_counter: AtomicU64,
75}
76
77impl SeekableAsyncFileMetrics {
78 pub fn sync_background_loops_counter(&self) -> u64 {
80 self.sync_background_loops_counter.load(Ordering::Relaxed)
81 }
82
83 pub fn sync_counter(&self) -> u64 {
85 self.sync_counter.load(Ordering::Relaxed)
86 }
87
88 pub fn sync_delayed_counter(&self) -> u64 {
90 self.sync_delayed_counter.load(Ordering::Relaxed)
91 }
92
93 pub fn sync_longest_delay_us_counter(&self) -> u64 {
95 self.sync_longest_delay_us_counter.load(Ordering::Relaxed)
96 }
97
98 pub fn sync_shortest_delay_us_counter(&self) -> u64 {
100 self.sync_shortest_delay_us_counter.load(Ordering::Relaxed)
101 }
102
103 pub fn sync_us_counter(&self) -> u64 {
105 self.sync_us_counter.load(Ordering::Relaxed)
106 }
107
108 pub fn write_bytes_counter(&self) -> u64 {
110 self.write_bytes_counter.load(Ordering::Relaxed)
111 }
112
113 pub fn write_counter(&self) -> u64 {
115 self.write_counter.load(Ordering::Relaxed)
116 }
117
118 pub fn write_us_counter(&self) -> u64 {
120 self.write_us_counter.load(Ordering::Relaxed)
121 }
122}
123
124#[derive(Clone)]
126pub struct SeekableAsyncFile {
127 #[cfg(feature = "tokio_file")]
130 fd: Arc<std::fs::File>,
131 #[cfg(feature = "mmap")]
132 mmap: Arc<memmap2::MmapRaw>,
133 #[cfg(feature = "mmap")]
134 mmap_len: usize,
135 sync_delay_us: u64,
136 metrics: Arc<SeekableAsyncFileMetrics>,
137 pending_sync_state: Arc<Mutex<PendingSyncState>>,
138}
139
140impl SeekableAsyncFile {
141 pub async fn open(
149 path: &Path,
150 #[cfg(feature = "mmap")] size: u64,
151 metrics: Arc<SeekableAsyncFileMetrics>,
152 sync_delay: Duration,
153 flags: i32,
154 ) -> Self {
155 let async_fd = OpenOptions::new()
156 .read(true)
157 .write(true)
158 .custom_flags(flags)
159 .open(path)
160 .await
161 .unwrap();
162
163 let fd = async_fd.into_std().await;
164
165 SeekableAsyncFile {
166 #[cfg(feature = "tokio_file")]
167 fd: Arc::new(fd),
168 #[cfg(feature = "mmap")]
169 mmap: Arc::new(memmap2::MmapRaw::map_raw(&fd).unwrap()),
170 #[cfg(feature = "mmap")]
171 mmap_len: usz!(size),
172 sync_delay_us: sync_delay.as_micros().try_into().unwrap(),
173 metrics,
174 pending_sync_state: Arc::new(Mutex::new(PendingSyncState {
175 earliest_unsynced: None,
176 latest_unsynced: None,
177 pending_sync_fut_states: Vec::new(),
178 })),
179 }
180 }
181
182 #[cfg(feature = "mmap")]
183 pub unsafe fn get_mmap_raw_ptr(&self, offset: u64) -> *const u8 {
184 self.mmap.as_ptr().add(usz!(offset))
185 }
186
187 #[cfg(feature = "mmap")]
188 pub unsafe fn get_mmap_raw_mut_ptr(&self, offset: u64) -> *mut u8 {
189 self.mmap.as_mut_ptr().add(usz!(offset))
190 }
191
192 fn bump_write_metrics(&self, len: u64, call_us: u64) {
193 self
194 .metrics
195 .write_bytes_counter
196 .fetch_add(len, Ordering::Relaxed);
197 self.metrics.write_counter.fetch_add(1, Ordering::Relaxed);
198 self
199 .metrics
200 .write_us_counter
201 .fetch_add(call_us, Ordering::Relaxed);
202 }
203
204 #[cfg(feature = "mmap")]
205 pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
206 let offset = usz!(offset);
207 let len = usz!(len);
208 let mmap = self.mmap.clone();
209 let mmap_len = self.mmap_len;
210 spawn_blocking(move || {
211 let memory = unsafe { std::slice::from_raw_parts(mmap.as_ptr(), mmap_len) };
212 memory[offset..offset + len].to_vec()
213 })
214 .await
215 .unwrap()
216 }
217
218 #[cfg(feature = "mmap")]
219 pub fn read_at_sync(&self, offset: u64, len: u64) -> Vec<u8> {
220 let offset = usz!(offset);
221 let len = usz!(len);
222 let memory = unsafe { std::slice::from_raw_parts(self.mmap.as_ptr(), self.mmap_len) };
223 memory[offset..offset + len].to_vec()
224 }
225
226 #[cfg(feature = "tokio_file")]
227 pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
228 let fd = self.fd.clone();
229 spawn_blocking(move || {
230 let mut buf = vec![0u8; len.try_into().unwrap()];
231 fd.read_exact_at(&mut buf, offset).unwrap();
232 buf
233 })
234 .await
235 .unwrap()
236 }
237
238 #[cfg(feature = "mmap")]
239 pub async fn write_at<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) {
240 let offset = usz!(offset);
241 let len = data.as_ref().len();
242 let started = Instant::now();
243
244 let mmap = self.mmap.clone();
245 let mmap_len = self.mmap_len;
246 spawn_blocking(move || {
247 let memory = unsafe { std::slice::from_raw_parts_mut(mmap.as_mut_ptr(), mmap_len) };
248 memory[offset..offset + len].copy_from_slice(data.as_ref());
249 })
250 .await
251 .unwrap();
252
253 let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
255 self.bump_write_metrics(len.try_into().unwrap(), call_us);
256 }
257
258 #[cfg(feature = "mmap")]
259 pub fn write_at_sync<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) -> () {
260 let offset = usz!(offset);
261 let len = data.as_ref().len();
262 let started = Instant::now();
263
264 let memory = unsafe { std::slice::from_raw_parts_mut(self.mmap.as_mut_ptr(), self.mmap_len) };
265 memory[offset..offset + len].copy_from_slice(data.as_ref());
266
267 let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
269 self.bump_write_metrics(len.try_into().unwrap(), call_us);
270 }
271
272 #[cfg(feature = "tokio_file")]
273 pub async fn write_at<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) {
274 let fd = self.fd.clone();
275 let len: u64 = data.as_ref().len().try_into().unwrap();
276 let started = Instant::now();
277 spawn_blocking(move || fd.write_all_at(data.as_ref(), offset).unwrap())
278 .await
279 .unwrap();
280 let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
282 self.bump_write_metrics(len, call_us);
283 }
284
285 #[cfg(any(feature = "mmap", feature = "tokio_file"))]
286 pub async fn write_at_with_delayed_sync<D: AsRef<[u8]> + Send + 'static>(
287 &self,
288 writes: impl IntoIterator<Item = WriteRequest<D>>,
289 ) {
290 let mut count: u64 = 0;
291 for w in writes {
293 count += 1;
294 self.write_at(w.offset, w.data).await;
295 }
296
297 let (fut, fut_ctl) = SignalFuture::new();
298
299 {
300 let mut state = self.pending_sync_state.lock().await;
301 let now = Instant::now();
302 state.earliest_unsynced.get_or_insert(now);
303 state.latest_unsynced = Some(now);
304 state.pending_sync_fut_states.push(fut_ctl.clone());
305 };
306
307 self
308 .metrics
309 .sync_delayed_counter
310 .fetch_add(count, Ordering::Relaxed);
311
312 fut.await;
313 }
314
315 pub async fn start_delayed_data_sync_background_loop(&self) {
316 let mut futures_to_wake = Vec::new();
318 loop {
319 sleep(std::time::Duration::from_micros(self.sync_delay_us)).await;
320
321 struct SyncNow {
322 longest_delay_us: u64,
323 shortest_delay_us: u64,
324 }
325
326 let sync_now = {
327 let mut state = self.pending_sync_state.lock().await;
328
329 if !state.pending_sync_fut_states.is_empty() {
330 let longest_delay_us = dur_us(state.earliest_unsynced.unwrap());
331 let shortest_delay_us = dur_us(state.latest_unsynced.unwrap());
332
333 state.earliest_unsynced = None;
334 state.latest_unsynced = None;
335
336 futures_to_wake.extend(state.pending_sync_fut_states.drain(..));
337
338 Some(SyncNow {
339 longest_delay_us,
340 shortest_delay_us,
341 })
342 } else {
343 None
344 }
345 };
346
347 if let Some(SyncNow {
348 longest_delay_us,
349 shortest_delay_us,
350 }) = sync_now
351 {
352 self
354 .metrics
355 .sync_longest_delay_us_counter
356 .fetch_add(longest_delay_us, Ordering::Relaxed);
357 self
358 .metrics
359 .sync_shortest_delay_us_counter
360 .fetch_add(shortest_delay_us, Ordering::Relaxed);
361
362 self.sync_data().await;
363
364 for ft in futures_to_wake.drain(..) {
365 ft.signal();
366 }
367 };
368
369 self
370 .metrics
371 .sync_background_loops_counter
372 .fetch_add(1, Ordering::Relaxed);
373 }
374 }
375
376 pub async fn sync_data(&self) {
377 #[cfg(feature = "tokio_file")]
378 let fd = self.fd.clone();
379 #[cfg(feature = "mmap")]
380 let mmap = self.mmap.clone();
381
382 let started = Instant::now();
383 spawn_blocking(move || {
384 #[cfg(feature = "tokio_file")]
385 fd.sync_data().unwrap();
386
387 #[cfg(feature = "mmap")]
388 mmap.flush().unwrap();
389 })
390 .await
391 .unwrap();
392 let sync_us: u64 = started.elapsed().as_micros().try_into().unwrap();
394 self.metrics.sync_counter.fetch_add(1, Ordering::Relaxed);
395 self
396 .metrics
397 .sync_us_counter
398 .fetch_add(sync_us, Ordering::Relaxed);
399 }
400}
401
402#[cfg(feature = "mmap")]
403impl<'a> Off64Read<'a, Vec<u8>> for SeekableAsyncFile {
404 fn read_at(&'a self, offset: u64, len: u64) -> Vec<u8> {
405 self.read_at_sync(offset, len)
406 }
407}
408#[cfg(feature = "mmap")]
409impl<'a> Off64ReadChrono<'a, Vec<u8>> for SeekableAsyncFile {}
410#[cfg(feature = "mmap")]
411impl<'a> Off64ReadInt<'a, Vec<u8>> for SeekableAsyncFile {}
412
413#[cfg(feature = "mmap")]
414impl Off64Write for SeekableAsyncFile {
415 fn write_at(&self, offset: u64, value: &[u8]) -> () {
416 self.write_at_sync(offset, value.to_vec())
417 }
418}
419#[cfg(feature = "mmap")]
420impl Off64WriteChrono for SeekableAsyncFile {}
421#[cfg(feature = "mmap")]
422impl Off64WriteInt for SeekableAsyncFile {}
423
424#[cfg(any(feature = "mmap", feature = "tokio_file"))]
425impl<'a> Off64AsyncRead<'a, Vec<u8>> for SeekableAsyncFile {
426 async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
427 SeekableAsyncFile::read_at(self, offset, len).await
428 }
429}
430#[cfg(any(feature = "mmap", feature = "tokio_file"))]
431impl<'a> Off64AsyncReadChrono<'a, Vec<u8>> for SeekableAsyncFile {}
432#[cfg(any(feature = "mmap", feature = "tokio_file"))]
433impl<'a> Off64AsyncReadInt<'a, Vec<u8>> for SeekableAsyncFile {}
434
435#[cfg(any(feature = "mmap", feature = "tokio_file"))]
436impl Off64AsyncWrite for SeekableAsyncFile {
437 async fn write_at(&self, offset: u64, value: &[u8]) {
438 SeekableAsyncFile::write_at(self, offset, value.to_vec()).await
439 }
440}
441#[cfg(any(feature = "mmap", feature = "tokio_file"))]
442impl Off64AsyncWriteChrono for SeekableAsyncFile {}
443#[cfg(any(feature = "mmap", feature = "tokio_file"))]
444impl Off64AsyncWriteInt for SeekableAsyncFile {}