1use async_trait::async_trait;
2use off64::chrono::Off64AsyncReadChrono;
3use off64::chrono::Off64AsyncWriteChrono;
4use off64::chrono::Off64ReadChrono;
5use off64::chrono::Off64WriteChrono;
6use off64::int::Off64AsyncReadInt;
7use off64::int::Off64AsyncWriteInt;
8use off64::int::Off64ReadInt;
9use off64::int::Off64WriteInt;
10use off64::usz;
11use off64::Off64AsyncRead;
12use off64::Off64AsyncWrite;
13use off64::Off64Read;
14use off64::Off64Write;
15use signal_future::SignalFuture;
16use signal_future::SignalFutureController;
17use std::io::SeekFrom;
18#[cfg(feature = "tokio_file")]
19use std::os::unix::prelude::FileExt;
20use std::path::Path;
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::fs::File;
26use tokio::fs::OpenOptions;
27use tokio::io;
28use tokio::io::AsyncSeekExt;
29use tokio::sync::Mutex;
30use tokio::task::spawn_blocking;
31use tokio::time::sleep;
32use tokio::time::Instant;
33
34pub async fn get_file_len_via_seek(path: &Path) -> io::Result<u64> {
35 let mut file = File::open(path).await?;
36 file.seek(SeekFrom::End(0)).await
38}
39
40fn dur_us(dur: Instant) -> u64 {
41 dur.elapsed().as_micros().try_into().unwrap()
42}
43
44pub struct WriteRequest<D: AsRef<[u8]> + Send + 'static> {
46 data: D,
47 offset: u64,
48}
49
50impl<D: AsRef<[u8]> + Send + 'static> WriteRequest<D> {
51 pub fn new(offset: u64, data: D) -> Self {
52 Self { data, offset }
53 }
54}
55
56struct PendingSyncState {
57 earliest_unsynced: Option<Instant>, latest_unsynced: Option<Instant>,
59 pending_sync_fut_states: Vec<SignalFutureController>,
60}
61
62#[derive(Default, Debug)]
66pub struct SeekableAsyncFileMetrics {
67 sync_background_loops_counter: AtomicU64,
68 sync_counter: AtomicU64,
69 sync_delayed_counter: AtomicU64,
70 sync_longest_delay_us_counter: AtomicU64,
71 sync_shortest_delay_us_counter: AtomicU64,
72 sync_us_counter: AtomicU64,
73 write_bytes_counter: AtomicU64,
74 write_counter: AtomicU64,
75 write_us_counter: AtomicU64,
76}
77
78impl SeekableAsyncFileMetrics {
79 pub fn sync_background_loops_counter(&self) -> u64 {
81 self.sync_background_loops_counter.load(Ordering::Relaxed)
82 }
83
84 pub fn sync_counter(&self) -> u64 {
86 self.sync_counter.load(Ordering::Relaxed)
87 }
88
89 pub fn sync_delayed_counter(&self) -> u64 {
91 self.sync_delayed_counter.load(Ordering::Relaxed)
92 }
93
94 pub fn sync_longest_delay_us_counter(&self) -> u64 {
96 self.sync_longest_delay_us_counter.load(Ordering::Relaxed)
97 }
98
99 pub fn sync_shortest_delay_us_counter(&self) -> u64 {
101 self.sync_shortest_delay_us_counter.load(Ordering::Relaxed)
102 }
103
104 pub fn sync_us_counter(&self) -> u64 {
106 self.sync_us_counter.load(Ordering::Relaxed)
107 }
108
109 pub fn write_bytes_counter(&self) -> u64 {
111 self.write_bytes_counter.load(Ordering::Relaxed)
112 }
113
114 pub fn write_counter(&self) -> u64 {
116 self.write_counter.load(Ordering::Relaxed)
117 }
118
119 pub fn write_us_counter(&self) -> u64 {
121 self.write_us_counter.load(Ordering::Relaxed)
122 }
123}
124
125#[derive(Clone)]
127pub struct SeekableAsyncFile {
128 #[cfg(feature = "tokio_file")]
131 fd: Arc<std::fs::File>,
132 #[cfg(feature = "mmap")]
133 mmap: Arc<memmap2::MmapRaw>,
134 #[cfg(feature = "mmap")]
135 mmap_len: usize,
136 sync_delay_us: u64,
137 metrics: Arc<SeekableAsyncFileMetrics>,
138 pending_sync_state: Arc<Mutex<PendingSyncState>>,
139}
140
141impl SeekableAsyncFile {
142 pub async fn open(
150 path: &Path,
151 #[cfg(feature = "mmap")] size: u64,
152 metrics: Arc<SeekableAsyncFileMetrics>,
153 sync_delay: Duration,
154 flags: i32,
155 ) -> Self {
156 let async_fd = OpenOptions::new()
157 .read(true)
158 .write(true)
159 .custom_flags(flags)
160 .open(path)
161 .await
162 .unwrap();
163
164 let fd = async_fd.into_std().await;
165
166 SeekableAsyncFile {
167 #[cfg(feature = "tokio_file")]
168 fd: Arc::new(fd),
169 #[cfg(feature = "mmap")]
170 mmap: Arc::new(memmap2::MmapRaw::map_raw(&fd).unwrap()),
171 #[cfg(feature = "mmap")]
172 mmap_len: usz!(size),
173 sync_delay_us: sync_delay.as_micros().try_into().unwrap(),
174 metrics,
175 pending_sync_state: Arc::new(Mutex::new(PendingSyncState {
176 earliest_unsynced: None,
177 latest_unsynced: None,
178 pending_sync_fut_states: Vec::new(),
179 })),
180 }
181 }
182
183 #[cfg(feature = "mmap")]
184 pub unsafe fn get_mmap_raw_ptr(&self, offset: u64) -> *const u8 {
185 self.mmap.as_ptr().add(usz!(offset))
186 }
187
188 #[cfg(feature = "mmap")]
189 pub unsafe fn get_mmap_raw_mut_ptr(&self, offset: u64) -> *mut u8 {
190 self.mmap.as_mut_ptr().add(usz!(offset))
191 }
192
193 fn bump_write_metrics(&self, len: u64, call_us: u64) {
194 self
195 .metrics
196 .write_bytes_counter
197 .fetch_add(len, Ordering::Relaxed);
198 self.metrics.write_counter.fetch_add(1, Ordering::Relaxed);
199 self
200 .metrics
201 .write_us_counter
202 .fetch_add(call_us, Ordering::Relaxed);
203 }
204
205 #[cfg(feature = "mmap")]
206 pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
207 let offset = usz!(offset);
208 let len = usz!(len);
209 let mmap = self.mmap.clone();
210 let mmap_len = self.mmap_len;
211 spawn_blocking(move || {
212 let memory = unsafe { std::slice::from_raw_parts(mmap.as_ptr(), mmap_len) };
213 memory[offset..offset + len].to_vec()
214 })
215 .await
216 .unwrap()
217 }
218
219 #[cfg(feature = "mmap")]
220 pub fn read_at_sync(&self, offset: u64, len: u64) -> Vec<u8> {
221 let offset = usz!(offset);
222 let len = usz!(len);
223 let memory = unsafe { std::slice::from_raw_parts(self.mmap.as_ptr(), self.mmap_len) };
224 memory[offset..offset + len].to_vec()
225 }
226
227 #[cfg(feature = "tokio_file")]
228 pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
229 let fd = self.fd.clone();
230 spawn_blocking(move || {
231 let mut buf = vec![0u8; len.try_into().unwrap()];
232 fd.read_exact_at(&mut buf, offset).unwrap();
233 buf
234 })
235 .await
236 .unwrap()
237 }
238
239 #[cfg(feature = "mmap")]
240 pub async fn write_at<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) {
241 let offset = usz!(offset);
242 let len = data.as_ref().len();
243 let started = Instant::now();
244
245 let mmap = self.mmap.clone();
246 let mmap_len = self.mmap_len;
247 spawn_blocking(move || {
248 let memory = unsafe { std::slice::from_raw_parts_mut(mmap.as_mut_ptr(), mmap_len) };
249 memory[offset..offset + len].copy_from_slice(data.as_ref());
250 })
251 .await
252 .unwrap();
253
254 let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
256 self.bump_write_metrics(len.try_into().unwrap(), call_us);
257 }
258
259 #[cfg(feature = "mmap")]
260 pub fn write_at_sync<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) -> () {
261 let offset = usz!(offset);
262 let len = data.as_ref().len();
263 let started = Instant::now();
264
265 let memory = unsafe { std::slice::from_raw_parts_mut(self.mmap.as_mut_ptr(), self.mmap_len) };
266 memory[offset..offset + len].copy_from_slice(data.as_ref());
267
268 let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
270 self.bump_write_metrics(len.try_into().unwrap(), call_us);
271 }
272
273 #[cfg(feature = "tokio_file")]
274 pub async fn write_at<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) {
275 let fd = self.fd.clone();
276 let len: u64 = data.as_ref().len().try_into().unwrap();
277 let started = Instant::now();
278 spawn_blocking(move || fd.write_all_at(data.as_ref(), offset).unwrap())
279 .await
280 .unwrap();
281 let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
283 self.bump_write_metrics(len, call_us);
284 }
285
286 pub async fn write_at_with_delayed_sync<D: AsRef<[u8]> + Send + 'static>(
287 &self,
288 writes: impl IntoIterator<Item = WriteRequest<D>>,
289 ) {
290 let mut count: u64 = 0;
291 for w in writes {
293 count += 1;
294 self.write_at(w.offset, w.data).await;
295 }
296
297 let (fut, fut_ctl) = SignalFuture::new();
298
299 {
300 let mut state = self.pending_sync_state.lock().await;
301 let now = Instant::now();
302 state.earliest_unsynced.get_or_insert(now);
303 state.latest_unsynced = Some(now);
304 state.pending_sync_fut_states.push(fut_ctl.clone());
305 };
306
307 self
308 .metrics
309 .sync_delayed_counter
310 .fetch_add(count, Ordering::Relaxed);
311
312 fut.await;
313 }
314
315 pub async fn start_delayed_data_sync_background_loop(&self) {
316 let mut futures_to_wake = Vec::new();
318 loop {
319 sleep(std::time::Duration::from_micros(self.sync_delay_us)).await;
320
321 struct SyncNow {
322 longest_delay_us: u64,
323 shortest_delay_us: u64,
324 }
325
326 let sync_now = {
327 let mut state = self.pending_sync_state.lock().await;
328
329 if !state.pending_sync_fut_states.is_empty() {
330 let longest_delay_us = dur_us(state.earliest_unsynced.unwrap());
331 let shortest_delay_us = dur_us(state.latest_unsynced.unwrap());
332
333 state.earliest_unsynced = None;
334 state.latest_unsynced = None;
335
336 futures_to_wake.extend(state.pending_sync_fut_states.drain(..));
337
338 Some(SyncNow {
339 longest_delay_us,
340 shortest_delay_us,
341 })
342 } else {
343 None
344 }
345 };
346
347 if let Some(SyncNow {
348 longest_delay_us,
349 shortest_delay_us,
350 }) = sync_now
351 {
352 self
354 .metrics
355 .sync_longest_delay_us_counter
356 .fetch_add(longest_delay_us, Ordering::Relaxed);
357 self
358 .metrics
359 .sync_shortest_delay_us_counter
360 .fetch_add(shortest_delay_us, Ordering::Relaxed);
361
362 self.sync_data().await;
363
364 for ft in futures_to_wake.drain(..) {
365 ft.signal();
366 }
367 };
368
369 self
370 .metrics
371 .sync_background_loops_counter
372 .fetch_add(1, Ordering::Relaxed);
373 }
374 }
375
376 pub async fn sync_data(&self) {
377 #[cfg(feature = "tokio_file")]
378 let fd = self.fd.clone();
379 #[cfg(feature = "mmap")]
380 let mmap = self.mmap.clone();
381
382 let started = Instant::now();
383 spawn_blocking(move || {
384 #[cfg(feature = "tokio_file")]
385 fd.sync_data().unwrap();
386
387 #[cfg(feature = "mmap")]
388 mmap.flush().unwrap();
389 })
390 .await
391 .unwrap();
392 let sync_us: u64 = started.elapsed().as_micros().try_into().unwrap();
394 self.metrics.sync_counter.fetch_add(1, Ordering::Relaxed);
395 self
396 .metrics
397 .sync_us_counter
398 .fetch_add(sync_us, Ordering::Relaxed);
399 }
400}
401
402#[cfg(feature = "mmap")]
403impl<'a> Off64Read<'a, Vec<u8>> for SeekableAsyncFile {
404 fn read_at(&'a self, offset: u64, len: u64) -> Vec<u8> {
405 self.read_at_sync(offset, len)
406 }
407}
408#[cfg(feature = "mmap")]
409impl<'a> Off64ReadChrono<'a, Vec<u8>> for SeekableAsyncFile {}
410#[cfg(feature = "mmap")]
411impl<'a> Off64ReadInt<'a, Vec<u8>> for SeekableAsyncFile {}
412
413#[cfg(feature = "mmap")]
414impl Off64Write for SeekableAsyncFile {
415 fn write_at(&self, offset: u64, value: &[u8]) -> () {
416 self.write_at_sync(offset, value.to_vec())
417 }
418}
419#[cfg(feature = "mmap")]
420impl Off64WriteChrono for SeekableAsyncFile {}
421#[cfg(feature = "mmap")]
422impl Off64WriteInt for SeekableAsyncFile {}
423
424#[async_trait]
425impl<'a> Off64AsyncRead<'a, Vec<u8>> for SeekableAsyncFile {
426 async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
427 SeekableAsyncFile::read_at(self, offset, len).await
428 }
429}
430impl<'a> Off64AsyncReadChrono<'a, Vec<u8>> for SeekableAsyncFile {}
431impl<'a> Off64AsyncReadInt<'a, Vec<u8>> for SeekableAsyncFile {}
432
433#[async_trait]
434impl Off64AsyncWrite for SeekableAsyncFile {
435 async fn write_at(&self, offset: u64, value: &[u8]) {
436 SeekableAsyncFile::write_at(self, offset, value.to_vec()).await
437 }
438}
439impl Off64AsyncWriteChrono for SeekableAsyncFile {}
440impl Off64AsyncWriteInt for SeekableAsyncFile {}