seekable_async_file/
lib.rs

1use off64::chrono::Off64AsyncReadChrono;
2use off64::chrono::Off64AsyncWriteChrono;
3use off64::chrono::Off64ReadChrono;
4use off64::chrono::Off64WriteChrono;
5use off64::int::Off64AsyncReadInt;
6use off64::int::Off64AsyncWriteInt;
7use off64::int::Off64ReadInt;
8use off64::int::Off64WriteInt;
9use off64::usz;
10use off64::Off64AsyncRead;
11use off64::Off64AsyncWrite;
12use off64::Off64Read;
13use off64::Off64Write;
14use signal_future::SignalFuture;
15use signal_future::SignalFutureController;
16use std::io::SeekFrom;
17#[cfg(feature = "tokio_file")]
18use std::os::unix::prelude::FileExt;
19use std::path::Path;
20use std::sync::atomic::AtomicU64;
21use std::sync::atomic::Ordering;
22use std::sync::Arc;
23use std::time::Duration;
24use tokio::fs::File;
25use tokio::fs::OpenOptions;
26use tokio::io;
27use tokio::io::AsyncSeekExt;
28use tokio::sync::Mutex;
29use tokio::task::spawn_blocking;
30use tokio::time::sleep;
31use tokio::time::Instant;
32
33pub async fn get_file_len_via_seek(path: &Path) -> io::Result<u64> {
34  let mut file = File::open(path).await?;
35  // Note that `file.metadata().len()` is 0 for device files.
36  file.seek(SeekFrom::End(0)).await
37}
38
39fn dur_us(dur: Instant) -> u64 {
40  dur.elapsed().as_micros().try_into().unwrap()
41}
42
43/// Data to write and the offset to write it at. This is provided to `write_at_with_delayed_sync`.
44pub struct WriteRequest<D: AsRef<[u8]> + Send + 'static> {
45  data: D,
46  offset: u64,
47}
48
49impl<D: AsRef<[u8]> + Send + 'static> WriteRequest<D> {
50  pub fn new(offset: u64, data: D) -> Self {
51    Self { data, offset }
52  }
53}
54
55struct PendingSyncState {
56  earliest_unsynced: Option<Instant>, // Only set when first pending_sync_fut_states is created; otherwise, metrics are misleading as we'd count time when no one is waiting for a sync as delayed sync time.
57  latest_unsynced: Option<Instant>,
58  pending_sync_fut_states: Vec<SignalFutureController>,
59}
60
61/// Metrics populated by a `SeekableAsyncFile`. There should be exactly one per `SeekableAsyncFile`; don't share between multiple `SeekableAsyncFile` values.
62///
63/// To initalise, use `SeekableAsyncFileMetrics::default()`. The values can be accessed via the thread-safe getter methods.
64#[derive(Default, Debug)]
65pub struct SeekableAsyncFileMetrics {
66  sync_background_loops_counter: AtomicU64,
67  sync_counter: AtomicU64,
68  sync_delayed_counter: AtomicU64,
69  sync_longest_delay_us_counter: AtomicU64,
70  sync_shortest_delay_us_counter: AtomicU64,
71  sync_us_counter: AtomicU64,
72  write_bytes_counter: AtomicU64,
73  write_counter: AtomicU64,
74  write_us_counter: AtomicU64,
75}
76
77impl SeekableAsyncFileMetrics {
78  /// Total number of delayed sync background loop iterations.
79  pub fn sync_background_loops_counter(&self) -> u64 {
80    self.sync_background_loops_counter.load(Ordering::Relaxed)
81  }
82
83  /// Total number of fsync and fdatasync syscalls.
84  pub fn sync_counter(&self) -> u64 {
85    self.sync_counter.load(Ordering::Relaxed)
86  }
87
88  /// Total number of requested syncs that were delayed until a later time.
89  pub fn sync_delayed_counter(&self) -> u64 {
90    self.sync_delayed_counter.load(Ordering::Relaxed)
91  }
92
93  /// Total number of microseconds spent waiting for a sync by one or more delayed syncs.
94  pub fn sync_longest_delay_us_counter(&self) -> u64 {
95    self.sync_longest_delay_us_counter.load(Ordering::Relaxed)
96  }
97
98  /// Total number of microseconds spent waiting after a final delayed sync before the actual sync.
99  pub fn sync_shortest_delay_us_counter(&self) -> u64 {
100    self.sync_shortest_delay_us_counter.load(Ordering::Relaxed)
101  }
102
103  /// Total number of microseconds spent in fsync and fdatasync syscalls.
104  pub fn sync_us_counter(&self) -> u64 {
105    self.sync_us_counter.load(Ordering::Relaxed)
106  }
107
108  /// Total number of bytes written.
109  pub fn write_bytes_counter(&self) -> u64 {
110    self.write_bytes_counter.load(Ordering::Relaxed)
111  }
112
113  /// Total number of write syscalls.
114  pub fn write_counter(&self) -> u64 {
115    self.write_counter.load(Ordering::Relaxed)
116  }
117
118  /// Total number of microseconds spent in write syscalls.
119  pub fn write_us_counter(&self) -> u64 {
120    self.write_us_counter.load(Ordering::Relaxed)
121  }
122}
123
124/// A `File`-like value that can perform async `read_at` and `write_at` for I/O at specific offsets without mutating any state (i.e. is thread safe). Metrics are collected, and syncs can be delayed for write batching opportunities as a performance optimisation.
125#[derive(Clone)]
126pub struct SeekableAsyncFile {
127  // Tokio has still not implemented read_at and write_at: https://github.com/tokio-rs/tokio/issues/1529. We need these to be able to share a file descriptor across threads (e.g. use from within async function).
128  // Apparently spawn_blocking is how Tokio does all file operations (as not all platforms have native async I/O), so our use is not worse but not optimised for async I/O either.
129  #[cfg(feature = "tokio_file")]
130  fd: Arc<std::fs::File>,
131  #[cfg(feature = "mmap")]
132  mmap: Arc<memmap2::MmapRaw>,
133  #[cfg(feature = "mmap")]
134  mmap_len: usize,
135  sync_delay_us: u64,
136  metrics: Arc<SeekableAsyncFileMetrics>,
137  pending_sync_state: Arc<Mutex<PendingSyncState>>,
138}
139
140impl SeekableAsyncFile {
141  /// Open a file descriptor in read and write modes, creating it if it doesn't exist. If it already exists, the contents won't be truncated.
142  ///
143  /// If the mmap feature is being used, to save a `stat` call, the size must be provided. This also allows opening non-standard files which may have a size of zero (e.g. block devices). A different size value also allows only using a portion of the beginning of the file.
144  ///
145  /// The `io_direct` and `io_dsync` parameters set the `O_DIRECT` and `O_DSYNC` flags, respectively. Unless you need those flags, provide `false`.
146  ///
147  /// Make sure to execute `start_delayed_data_sync_background_loop` in the background after this call.
148  pub async fn open(
149    path: &Path,
150    #[cfg(feature = "mmap")] size: u64,
151    metrics: Arc<SeekableAsyncFileMetrics>,
152    sync_delay: Duration,
153    flags: i32,
154  ) -> Self {
155    let async_fd = OpenOptions::new()
156      .read(true)
157      .write(true)
158      .custom_flags(flags)
159      .open(path)
160      .await
161      .unwrap();
162
163    let fd = async_fd.into_std().await;
164
165    SeekableAsyncFile {
166      #[cfg(feature = "tokio_file")]
167      fd: Arc::new(fd),
168      #[cfg(feature = "mmap")]
169      mmap: Arc::new(memmap2::MmapRaw::map_raw(&fd).unwrap()),
170      #[cfg(feature = "mmap")]
171      mmap_len: usz!(size),
172      sync_delay_us: sync_delay.as_micros().try_into().unwrap(),
173      metrics,
174      pending_sync_state: Arc::new(Mutex::new(PendingSyncState {
175        earliest_unsynced: None,
176        latest_unsynced: None,
177        pending_sync_fut_states: Vec::new(),
178      })),
179    }
180  }
181
182  #[cfg(feature = "mmap")]
183  pub unsafe fn get_mmap_raw_ptr(&self, offset: u64) -> *const u8 {
184    self.mmap.as_ptr().add(usz!(offset))
185  }
186
187  #[cfg(feature = "mmap")]
188  pub unsafe fn get_mmap_raw_mut_ptr(&self, offset: u64) -> *mut u8 {
189    self.mmap.as_mut_ptr().add(usz!(offset))
190  }
191
192  fn bump_write_metrics(&self, len: u64, call_us: u64) {
193    self
194      .metrics
195      .write_bytes_counter
196      .fetch_add(len, Ordering::Relaxed);
197    self.metrics.write_counter.fetch_add(1, Ordering::Relaxed);
198    self
199      .metrics
200      .write_us_counter
201      .fetch_add(call_us, Ordering::Relaxed);
202  }
203
204  #[cfg(feature = "mmap")]
205  pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
206    let offset = usz!(offset);
207    let len = usz!(len);
208    let mmap = self.mmap.clone();
209    let mmap_len = self.mmap_len;
210    spawn_blocking(move || {
211      let memory = unsafe { std::slice::from_raw_parts(mmap.as_ptr(), mmap_len) };
212      memory[offset..offset + len].to_vec()
213    })
214    .await
215    .unwrap()
216  }
217
218  #[cfg(feature = "mmap")]
219  pub fn read_at_sync(&self, offset: u64, len: u64) -> Vec<u8> {
220    let offset = usz!(offset);
221    let len = usz!(len);
222    let memory = unsafe { std::slice::from_raw_parts(self.mmap.as_ptr(), self.mmap_len) };
223    memory[offset..offset + len].to_vec()
224  }
225
226  #[cfg(feature = "tokio_file")]
227  pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
228    let fd = self.fd.clone();
229    spawn_blocking(move || {
230      let mut buf = vec![0u8; len.try_into().unwrap()];
231      fd.read_exact_at(&mut buf, offset).unwrap();
232      buf
233    })
234    .await
235    .unwrap()
236  }
237
238  #[cfg(feature = "mmap")]
239  pub async fn write_at<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) {
240    let offset = usz!(offset);
241    let len = data.as_ref().len();
242    let started = Instant::now();
243
244    let mmap = self.mmap.clone();
245    let mmap_len = self.mmap_len;
246    spawn_blocking(move || {
247      let memory = unsafe { std::slice::from_raw_parts_mut(mmap.as_mut_ptr(), mmap_len) };
248      memory[offset..offset + len].copy_from_slice(data.as_ref());
249    })
250    .await
251    .unwrap();
252
253    // This could be significant e.g. page fault.
254    let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
255    self.bump_write_metrics(len.try_into().unwrap(), call_us);
256  }
257
258  #[cfg(feature = "mmap")]
259  pub fn write_at_sync<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) -> () {
260    let offset = usz!(offset);
261    let len = data.as_ref().len();
262    let started = Instant::now();
263
264    let memory = unsafe { std::slice::from_raw_parts_mut(self.mmap.as_mut_ptr(), self.mmap_len) };
265    memory[offset..offset + len].copy_from_slice(data.as_ref());
266
267    // This could be significant e.g. page fault.
268    let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
269    self.bump_write_metrics(len.try_into().unwrap(), call_us);
270  }
271
272  #[cfg(feature = "tokio_file")]
273  pub async fn write_at<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) {
274    let fd = self.fd.clone();
275    let len: u64 = data.as_ref().len().try_into().unwrap();
276    let started = Instant::now();
277    spawn_blocking(move || fd.write_all_at(data.as_ref(), offset).unwrap())
278      .await
279      .unwrap();
280    // Yes, we're including the overhead of Tokio's spawn_blocking.
281    let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
282    self.bump_write_metrics(len, call_us);
283  }
284
285  #[cfg(any(feature = "mmap", feature = "tokio_file"))]
286  pub async fn write_at_with_delayed_sync<D: AsRef<[u8]> + Send + 'static>(
287    &self,
288    writes: impl IntoIterator<Item = WriteRequest<D>>,
289  ) {
290    let mut count: u64 = 0;
291    // WARNING: We cannot make this concurrent as writes must be logically applied in order.
292    for w in writes {
293      count += 1;
294      self.write_at(w.offset, w.data).await;
295    }
296
297    let (fut, fut_ctl) = SignalFuture::new();
298
299    {
300      let mut state = self.pending_sync_state.lock().await;
301      let now = Instant::now();
302      state.earliest_unsynced.get_or_insert(now);
303      state.latest_unsynced = Some(now);
304      state.pending_sync_fut_states.push(fut_ctl.clone());
305    };
306
307    self
308      .metrics
309      .sync_delayed_counter
310      .fetch_add(count, Ordering::Relaxed);
311
312    fut.await;
313  }
314
315  pub async fn start_delayed_data_sync_background_loop(&self) {
316    // Store these outside and reuse them to avoid reallocations on each loop.
317    let mut futures_to_wake = Vec::new();
318    loop {
319      sleep(std::time::Duration::from_micros(self.sync_delay_us)).await;
320
321      struct SyncNow {
322        longest_delay_us: u64,
323        shortest_delay_us: u64,
324      }
325
326      let sync_now = {
327        let mut state = self.pending_sync_state.lock().await;
328
329        if !state.pending_sync_fut_states.is_empty() {
330          let longest_delay_us = dur_us(state.earliest_unsynced.unwrap());
331          let shortest_delay_us = dur_us(state.latest_unsynced.unwrap());
332
333          state.earliest_unsynced = None;
334          state.latest_unsynced = None;
335
336          futures_to_wake.extend(state.pending_sync_fut_states.drain(..));
337
338          Some(SyncNow {
339            longest_delay_us,
340            shortest_delay_us,
341          })
342        } else {
343          None
344        }
345      };
346
347      if let Some(SyncNow {
348        longest_delay_us,
349        shortest_delay_us,
350      }) = sync_now
351      {
352        // OPTIMISATION: Don't perform these atomic operations while unnecessarily holding up the lock.
353        self
354          .metrics
355          .sync_longest_delay_us_counter
356          .fetch_add(longest_delay_us, Ordering::Relaxed);
357        self
358          .metrics
359          .sync_shortest_delay_us_counter
360          .fetch_add(shortest_delay_us, Ordering::Relaxed);
361
362        self.sync_data().await;
363
364        for ft in futures_to_wake.drain(..) {
365          ft.signal();
366        }
367      };
368
369      self
370        .metrics
371        .sync_background_loops_counter
372        .fetch_add(1, Ordering::Relaxed);
373    }
374  }
375
376  pub async fn sync_data(&self) {
377    #[cfg(feature = "tokio_file")]
378    let fd = self.fd.clone();
379    #[cfg(feature = "mmap")]
380    let mmap = self.mmap.clone();
381
382    let started = Instant::now();
383    spawn_blocking(move || {
384      #[cfg(feature = "tokio_file")]
385      fd.sync_data().unwrap();
386
387      #[cfg(feature = "mmap")]
388      mmap.flush().unwrap();
389    })
390    .await
391    .unwrap();
392    // Yes, we're including the overhead of Tokio's spawn_blocking.
393    let sync_us: u64 = started.elapsed().as_micros().try_into().unwrap();
394    self.metrics.sync_counter.fetch_add(1, Ordering::Relaxed);
395    self
396      .metrics
397      .sync_us_counter
398      .fetch_add(sync_us, Ordering::Relaxed);
399  }
400}
401
402#[cfg(feature = "mmap")]
403impl<'a> Off64Read<'a, Vec<u8>> for SeekableAsyncFile {
404  fn read_at(&'a self, offset: u64, len: u64) -> Vec<u8> {
405    self.read_at_sync(offset, len)
406  }
407}
408#[cfg(feature = "mmap")]
409impl<'a> Off64ReadChrono<'a, Vec<u8>> for SeekableAsyncFile {}
410#[cfg(feature = "mmap")]
411impl<'a> Off64ReadInt<'a, Vec<u8>> for SeekableAsyncFile {}
412
413#[cfg(feature = "mmap")]
414impl Off64Write for SeekableAsyncFile {
415  fn write_at(&self, offset: u64, value: &[u8]) -> () {
416    self.write_at_sync(offset, value.to_vec())
417  }
418}
419#[cfg(feature = "mmap")]
420impl Off64WriteChrono for SeekableAsyncFile {}
421#[cfg(feature = "mmap")]
422impl Off64WriteInt for SeekableAsyncFile {}
423
424#[cfg(any(feature = "mmap", feature = "tokio_file"))]
425impl<'a> Off64AsyncRead<'a, Vec<u8>> for SeekableAsyncFile {
426  async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
427    SeekableAsyncFile::read_at(self, offset, len).await
428  }
429}
430#[cfg(any(feature = "mmap", feature = "tokio_file"))]
431impl<'a> Off64AsyncReadChrono<'a, Vec<u8>> for SeekableAsyncFile {}
432#[cfg(any(feature = "mmap", feature = "tokio_file"))]
433impl<'a> Off64AsyncReadInt<'a, Vec<u8>> for SeekableAsyncFile {}
434
435#[cfg(any(feature = "mmap", feature = "tokio_file"))]
436impl Off64AsyncWrite for SeekableAsyncFile {
437  async fn write_at(&self, offset: u64, value: &[u8]) {
438    SeekableAsyncFile::write_at(self, offset, value.to_vec()).await
439  }
440}
441#[cfg(any(feature = "mmap", feature = "tokio_file"))]
442impl Off64AsyncWriteChrono for SeekableAsyncFile {}
443#[cfg(any(feature = "mmap", feature = "tokio_file"))]
444impl Off64AsyncWriteInt for SeekableAsyncFile {}