seekable_async_file/
lib.rs

1use async_trait::async_trait;
2use off64::chrono::Off64AsyncReadChrono;
3use off64::chrono::Off64AsyncWriteChrono;
4use off64::chrono::Off64ReadChrono;
5use off64::chrono::Off64WriteChrono;
6use off64::int::Off64AsyncReadInt;
7use off64::int::Off64AsyncWriteInt;
8use off64::int::Off64ReadInt;
9use off64::int::Off64WriteInt;
10use off64::usz;
11use off64::Off64AsyncRead;
12use off64::Off64AsyncWrite;
13use off64::Off64Read;
14use off64::Off64Write;
15use signal_future::SignalFuture;
16use signal_future::SignalFutureController;
17use std::io::SeekFrom;
18#[cfg(feature = "tokio_file")]
19use std::os::unix::prelude::FileExt;
20use std::path::Path;
21use std::sync::atomic::AtomicU64;
22use std::sync::atomic::Ordering;
23use std::sync::Arc;
24use std::time::Duration;
25use tokio::fs::File;
26use tokio::fs::OpenOptions;
27use tokio::io;
28use tokio::io::AsyncSeekExt;
29use tokio::sync::Mutex;
30use tokio::task::spawn_blocking;
31use tokio::time::sleep;
32use tokio::time::Instant;
33
34pub async fn get_file_len_via_seek(path: &Path) -> io::Result<u64> {
35  let mut file = File::open(path).await?;
36  // Note that `file.metadata().len()` is 0 for device files.
37  file.seek(SeekFrom::End(0)).await
38}
39
40fn dur_us(dur: Instant) -> u64 {
41  dur.elapsed().as_micros().try_into().unwrap()
42}
43
44/// Data to write and the offset to write it at. This is provided to `write_at_with_delayed_sync`.
45pub struct WriteRequest<D: AsRef<[u8]> + Send + 'static> {
46  data: D,
47  offset: u64,
48}
49
50impl<D: AsRef<[u8]> + Send + 'static> WriteRequest<D> {
51  pub fn new(offset: u64, data: D) -> Self {
52    Self { data, offset }
53  }
54}
55
56struct PendingSyncState {
57  earliest_unsynced: Option<Instant>, // Only set when first pending_sync_fut_states is created; otherwise, metrics are misleading as we'd count time when no one is waiting for a sync as delayed sync time.
58  latest_unsynced: Option<Instant>,
59  pending_sync_fut_states: Vec<SignalFutureController>,
60}
61
62/// Metrics populated by a `SeekableAsyncFile`. There should be exactly one per `SeekableAsyncFile`; don't share between multiple `SeekableAsyncFile` values.
63///
64/// To initalise, use `SeekableAsyncFileMetrics::default()`. The values can be accessed via the thread-safe getter methods.
65#[derive(Default, Debug)]
66pub struct SeekableAsyncFileMetrics {
67  sync_background_loops_counter: AtomicU64,
68  sync_counter: AtomicU64,
69  sync_delayed_counter: AtomicU64,
70  sync_longest_delay_us_counter: AtomicU64,
71  sync_shortest_delay_us_counter: AtomicU64,
72  sync_us_counter: AtomicU64,
73  write_bytes_counter: AtomicU64,
74  write_counter: AtomicU64,
75  write_us_counter: AtomicU64,
76}
77
78impl SeekableAsyncFileMetrics {
79  /// Total number of delayed sync background loop iterations.
80  pub fn sync_background_loops_counter(&self) -> u64 {
81    self.sync_background_loops_counter.load(Ordering::Relaxed)
82  }
83
84  /// Total number of fsync and fdatasync syscalls.
85  pub fn sync_counter(&self) -> u64 {
86    self.sync_counter.load(Ordering::Relaxed)
87  }
88
89  /// Total number of requested syncs that were delayed until a later time.
90  pub fn sync_delayed_counter(&self) -> u64 {
91    self.sync_delayed_counter.load(Ordering::Relaxed)
92  }
93
94  /// Total number of microseconds spent waiting for a sync by one or more delayed syncs.
95  pub fn sync_longest_delay_us_counter(&self) -> u64 {
96    self.sync_longest_delay_us_counter.load(Ordering::Relaxed)
97  }
98
99  /// Total number of microseconds spent waiting after a final delayed sync before the actual sync.
100  pub fn sync_shortest_delay_us_counter(&self) -> u64 {
101    self.sync_shortest_delay_us_counter.load(Ordering::Relaxed)
102  }
103
104  /// Total number of microseconds spent in fsync and fdatasync syscalls.
105  pub fn sync_us_counter(&self) -> u64 {
106    self.sync_us_counter.load(Ordering::Relaxed)
107  }
108
109  /// Total number of bytes written.
110  pub fn write_bytes_counter(&self) -> u64 {
111    self.write_bytes_counter.load(Ordering::Relaxed)
112  }
113
114  /// Total number of write syscalls.
115  pub fn write_counter(&self) -> u64 {
116    self.write_counter.load(Ordering::Relaxed)
117  }
118
119  /// Total number of microseconds spent in write syscalls.
120  pub fn write_us_counter(&self) -> u64 {
121    self.write_us_counter.load(Ordering::Relaxed)
122  }
123}
124
125/// A `File`-like value that can perform async `read_at` and `write_at` for I/O at specific offsets without mutating any state (i.e. is thread safe). Metrics are collected, and syncs can be delayed for write batching opportunities as a performance optimisation.
126#[derive(Clone)]
127pub struct SeekableAsyncFile {
128  // Tokio has still not implemented read_at and write_at: https://github.com/tokio-rs/tokio/issues/1529. We need these to be able to share a file descriptor across threads (e.g. use from within async function).
129  // Apparently spawn_blocking is how Tokio does all file operations (as not all platforms have native async I/O), so our use is not worse but not optimised for async I/O either.
130  #[cfg(feature = "tokio_file")]
131  fd: Arc<std::fs::File>,
132  #[cfg(feature = "mmap")]
133  mmap: Arc<memmap2::MmapRaw>,
134  #[cfg(feature = "mmap")]
135  mmap_len: usize,
136  sync_delay_us: u64,
137  metrics: Arc<SeekableAsyncFileMetrics>,
138  pending_sync_state: Arc<Mutex<PendingSyncState>>,
139}
140
141impl SeekableAsyncFile {
142  /// Open a file descriptor in read and write modes, creating it if it doesn't exist. If it already exists, the contents won't be truncated.
143  ///
144  /// If the mmap feature is being used, to save a `stat` call, the size must be provided. This also allows opening non-standard files which may have a size of zero (e.g. block devices). A different size value also allows only using a portion of the beginning of the file.
145  ///
146  /// The `io_direct` and `io_dsync` parameters set the `O_DIRECT` and `O_DSYNC` flags, respectively. Unless you need those flags, provide `false`.
147  ///
148  /// Make sure to execute `start_delayed_data_sync_background_loop` in the background after this call.
149  pub async fn open(
150    path: &Path,
151    #[cfg(feature = "mmap")] size: u64,
152    metrics: Arc<SeekableAsyncFileMetrics>,
153    sync_delay: Duration,
154    flags: i32,
155  ) -> Self {
156    let async_fd = OpenOptions::new()
157      .read(true)
158      .write(true)
159      .custom_flags(flags)
160      .open(path)
161      .await
162      .unwrap();
163
164    let fd = async_fd.into_std().await;
165
166    SeekableAsyncFile {
167      #[cfg(feature = "tokio_file")]
168      fd: Arc::new(fd),
169      #[cfg(feature = "mmap")]
170      mmap: Arc::new(memmap2::MmapRaw::map_raw(&fd).unwrap()),
171      #[cfg(feature = "mmap")]
172      mmap_len: usz!(size),
173      sync_delay_us: sync_delay.as_micros().try_into().unwrap(),
174      metrics,
175      pending_sync_state: Arc::new(Mutex::new(PendingSyncState {
176        earliest_unsynced: None,
177        latest_unsynced: None,
178        pending_sync_fut_states: Vec::new(),
179      })),
180    }
181  }
182
183  #[cfg(feature = "mmap")]
184  pub unsafe fn get_mmap_raw_ptr(&self, offset: u64) -> *const u8 {
185    self.mmap.as_ptr().add(usz!(offset))
186  }
187
188  #[cfg(feature = "mmap")]
189  pub unsafe fn get_mmap_raw_mut_ptr(&self, offset: u64) -> *mut u8 {
190    self.mmap.as_mut_ptr().add(usz!(offset))
191  }
192
193  fn bump_write_metrics(&self, len: u64, call_us: u64) {
194    self
195      .metrics
196      .write_bytes_counter
197      .fetch_add(len, Ordering::Relaxed);
198    self.metrics.write_counter.fetch_add(1, Ordering::Relaxed);
199    self
200      .metrics
201      .write_us_counter
202      .fetch_add(call_us, Ordering::Relaxed);
203  }
204
205  #[cfg(feature = "mmap")]
206  pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
207    let offset = usz!(offset);
208    let len = usz!(len);
209    let mmap = self.mmap.clone();
210    let mmap_len = self.mmap_len;
211    spawn_blocking(move || {
212      let memory = unsafe { std::slice::from_raw_parts(mmap.as_ptr(), mmap_len) };
213      memory[offset..offset + len].to_vec()
214    })
215    .await
216    .unwrap()
217  }
218
219  #[cfg(feature = "mmap")]
220  pub fn read_at_sync(&self, offset: u64, len: u64) -> Vec<u8> {
221    let offset = usz!(offset);
222    let len = usz!(len);
223    let memory = unsafe { std::slice::from_raw_parts(self.mmap.as_ptr(), self.mmap_len) };
224    memory[offset..offset + len].to_vec()
225  }
226
227  #[cfg(feature = "tokio_file")]
228  pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
229    let fd = self.fd.clone();
230    spawn_blocking(move || {
231      let mut buf = vec![0u8; len.try_into().unwrap()];
232      fd.read_exact_at(&mut buf, offset).unwrap();
233      buf
234    })
235    .await
236    .unwrap()
237  }
238
239  #[cfg(feature = "mmap")]
240  pub async fn write_at<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) {
241    let offset = usz!(offset);
242    let len = data.as_ref().len();
243    let started = Instant::now();
244
245    let mmap = self.mmap.clone();
246    let mmap_len = self.mmap_len;
247    spawn_blocking(move || {
248      let memory = unsafe { std::slice::from_raw_parts_mut(mmap.as_mut_ptr(), mmap_len) };
249      memory[offset..offset + len].copy_from_slice(data.as_ref());
250    })
251    .await
252    .unwrap();
253
254    // This could be significant e.g. page fault.
255    let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
256    self.bump_write_metrics(len.try_into().unwrap(), call_us);
257  }
258
259  #[cfg(feature = "mmap")]
260  pub fn write_at_sync<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) -> () {
261    let offset = usz!(offset);
262    let len = data.as_ref().len();
263    let started = Instant::now();
264
265    let memory = unsafe { std::slice::from_raw_parts_mut(self.mmap.as_mut_ptr(), self.mmap_len) };
266    memory[offset..offset + len].copy_from_slice(data.as_ref());
267
268    // This could be significant e.g. page fault.
269    let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
270    self.bump_write_metrics(len.try_into().unwrap(), call_us);
271  }
272
273  #[cfg(feature = "tokio_file")]
274  pub async fn write_at<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) {
275    let fd = self.fd.clone();
276    let len: u64 = data.as_ref().len().try_into().unwrap();
277    let started = Instant::now();
278    spawn_blocking(move || fd.write_all_at(data.as_ref(), offset).unwrap())
279      .await
280      .unwrap();
281    // Yes, we're including the overhead of Tokio's spawn_blocking.
282    let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
283    self.bump_write_metrics(len, call_us);
284  }
285
286  pub async fn write_at_with_delayed_sync<D: AsRef<[u8]> + Send + 'static>(
287    &self,
288    writes: impl IntoIterator<Item = WriteRequest<D>>,
289  ) {
290    let mut count: u64 = 0;
291    // WARNING: We cannot make this concurrent as writes must be logically applied in order.
292    for w in writes {
293      count += 1;
294      self.write_at(w.offset, w.data).await;
295    }
296
297    let (fut, fut_ctl) = SignalFuture::new();
298
299    {
300      let mut state = self.pending_sync_state.lock().await;
301      let now = Instant::now();
302      state.earliest_unsynced.get_or_insert(now);
303      state.latest_unsynced = Some(now);
304      state.pending_sync_fut_states.push(fut_ctl.clone());
305    };
306
307    self
308      .metrics
309      .sync_delayed_counter
310      .fetch_add(count, Ordering::Relaxed);
311
312    fut.await;
313  }
314
315  pub async fn start_delayed_data_sync_background_loop(&self) {
316    // Store these outside and reuse them to avoid reallocations on each loop.
317    let mut futures_to_wake = Vec::new();
318    loop {
319      sleep(std::time::Duration::from_micros(self.sync_delay_us)).await;
320
321      struct SyncNow {
322        longest_delay_us: u64,
323        shortest_delay_us: u64,
324      }
325
326      let sync_now = {
327        let mut state = self.pending_sync_state.lock().await;
328
329        if !state.pending_sync_fut_states.is_empty() {
330          let longest_delay_us = dur_us(state.earliest_unsynced.unwrap());
331          let shortest_delay_us = dur_us(state.latest_unsynced.unwrap());
332
333          state.earliest_unsynced = None;
334          state.latest_unsynced = None;
335
336          futures_to_wake.extend(state.pending_sync_fut_states.drain(..));
337
338          Some(SyncNow {
339            longest_delay_us,
340            shortest_delay_us,
341          })
342        } else {
343          None
344        }
345      };
346
347      if let Some(SyncNow {
348        longest_delay_us,
349        shortest_delay_us,
350      }) = sync_now
351      {
352        // OPTIMISATION: Don't perform these atomic operations while unnecessarily holding up the lock.
353        self
354          .metrics
355          .sync_longest_delay_us_counter
356          .fetch_add(longest_delay_us, Ordering::Relaxed);
357        self
358          .metrics
359          .sync_shortest_delay_us_counter
360          .fetch_add(shortest_delay_us, Ordering::Relaxed);
361
362        self.sync_data().await;
363
364        for ft in futures_to_wake.drain(..) {
365          ft.signal();
366        }
367      };
368
369      self
370        .metrics
371        .sync_background_loops_counter
372        .fetch_add(1, Ordering::Relaxed);
373    }
374  }
375
376  pub async fn sync_data(&self) {
377    #[cfg(feature = "tokio_file")]
378    let fd = self.fd.clone();
379    #[cfg(feature = "mmap")]
380    let mmap = self.mmap.clone();
381
382    let started = Instant::now();
383    spawn_blocking(move || {
384      #[cfg(feature = "tokio_file")]
385      fd.sync_data().unwrap();
386
387      #[cfg(feature = "mmap")]
388      mmap.flush().unwrap();
389    })
390    .await
391    .unwrap();
392    // Yes, we're including the overhead of Tokio's spawn_blocking.
393    let sync_us: u64 = started.elapsed().as_micros().try_into().unwrap();
394    self.metrics.sync_counter.fetch_add(1, Ordering::Relaxed);
395    self
396      .metrics
397      .sync_us_counter
398      .fetch_add(sync_us, Ordering::Relaxed);
399  }
400}
401
402#[cfg(feature = "mmap")]
403impl<'a> Off64Read<'a, Vec<u8>> for SeekableAsyncFile {
404  fn read_at(&'a self, offset: u64, len: u64) -> Vec<u8> {
405    self.read_at_sync(offset, len)
406  }
407}
408#[cfg(feature = "mmap")]
409impl<'a> Off64ReadChrono<'a, Vec<u8>> for SeekableAsyncFile {}
410#[cfg(feature = "mmap")]
411impl<'a> Off64ReadInt<'a, Vec<u8>> for SeekableAsyncFile {}
412
413#[cfg(feature = "mmap")]
414impl Off64Write for SeekableAsyncFile {
415  fn write_at(&self, offset: u64, value: &[u8]) -> () {
416    self.write_at_sync(offset, value.to_vec())
417  }
418}
419#[cfg(feature = "mmap")]
420impl Off64WriteChrono for SeekableAsyncFile {}
421#[cfg(feature = "mmap")]
422impl Off64WriteInt for SeekableAsyncFile {}
423
424#[async_trait]
425impl<'a> Off64AsyncRead<'a, Vec<u8>> for SeekableAsyncFile {
426  async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
427    SeekableAsyncFile::read_at(self, offset, len).await
428  }
429}
430impl<'a> Off64AsyncReadChrono<'a, Vec<u8>> for SeekableAsyncFile {}
431impl<'a> Off64AsyncReadInt<'a, Vec<u8>> for SeekableAsyncFile {}
432
433#[async_trait]
434impl Off64AsyncWrite for SeekableAsyncFile {
435  async fn write_at(&self, offset: u64, value: &[u8]) {
436    SeekableAsyncFile::write_at(self, offset, value.to_vec()).await
437  }
438}
439impl Off64AsyncWriteChrono for SeekableAsyncFile {}
440impl Off64AsyncWriteInt for SeekableAsyncFile {}