seekable_async_file/
lib.rs

1pub mod common;
2pub mod file;
3pub mod merge;
4pub mod mmap;
5
6use crate::common::IAsyncIO;
7use crate::merge::merge_overlapping_writes;
8use async_trait::async_trait;
9use futures::stream::iter;
10use futures::stream::StreamExt;
11use off64::chrono::Off64AsyncReadChrono;
12use off64::chrono::Off64AsyncWriteChrono;
13use off64::int::Off64AsyncReadInt;
14use off64::int::Off64AsyncWriteInt;
15use off64::u64;
16use off64::Off64AsyncRead;
17use off64::Off64AsyncWrite;
18use signal_future::SignalFuture;
19use signal_future::SignalFutureController;
20use std::io::SeekFrom;
21use std::path::Path;
22use std::sync::atomic::AtomicU64;
23use std::sync::atomic::Ordering;
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::fs::File;
27use tokio::io;
28use tokio::io::AsyncSeekExt;
29use tokio::sync::Mutex;
30use tokio::time::sleep;
31use tokio::time::Instant;
32
33pub async fn get_file_len_via_seek(path: &Path) -> io::Result<u64> {
34  let mut file = File::open(path).await?;
35  // Note that `file.metadata().len()` is 0 for device files.
36  file.seek(SeekFrom::End(0)).await
37}
38
39fn dur_us(dur: Instant) -> u64 {
40  dur.elapsed().as_micros().try_into().unwrap()
41}
42
43/// Data to write and the offset to write it at. This is provided to `write_at_with_delayed_sync`.
44pub struct WriteRequest<D: AsRef<[u8]> + Send + 'static> {
45  data: D,
46  offset: u64,
47}
48
49impl<D: AsRef<[u8]> + Send + 'static> WriteRequest<D> {
50  pub fn new(offset: u64, data: D) -> Self {
51    Self { data, offset }
52  }
53}
54
55struct PendingSyncState {
56  earliest_unsynced: Option<Instant>, // Only set when first pending_sync_fut_states is created; otherwise, metrics are misleading as we'd count time when no one is waiting for a sync as delayed sync time.
57  latest_unsynced: Option<Instant>,
58  pending_sync_fut_states: Vec<SignalFutureController>,
59}
60
61/// Metrics populated by a `SeekableAsyncFile`. There should be exactly one per `SeekableAsyncFile`; don't share between multiple `SeekableAsyncFile` values.
62///
63/// To initalise, use `SeekableAsyncFileMetrics::default()`. The values can be accessed via the thread-safe getter methods.
64#[derive(Default, Debug)]
65pub struct SeekableAsyncFileMetrics {
66  sync_background_loops_counter: AtomicU64,
67  sync_counter: AtomicU64,
68  sync_delayed_counter: AtomicU64,
69  sync_longest_delay_us_counter: AtomicU64,
70  sync_shortest_delay_us_counter: AtomicU64,
71  sync_us_counter: AtomicU64,
72  write_bytes_counter: AtomicU64,
73  write_counter: AtomicU64,
74  write_us_counter: AtomicU64,
75}
76
77impl SeekableAsyncFileMetrics {
78  /// Total number of delayed sync background loop iterations.
79  pub fn sync_background_loops_counter(&self) -> u64 {
80    self.sync_background_loops_counter.load(Ordering::Relaxed)
81  }
82
83  /// Total number of fsync and fdatasync syscalls.
84  pub fn sync_counter(&self) -> u64 {
85    self.sync_counter.load(Ordering::Relaxed)
86  }
87
88  /// Total number of requested syncs that were delayed until a later time.
89  pub fn sync_delayed_counter(&self) -> u64 {
90    self.sync_delayed_counter.load(Ordering::Relaxed)
91  }
92
93  /// Total number of microseconds spent waiting for a sync by one or more delayed syncs.
94  pub fn sync_longest_delay_us_counter(&self) -> u64 {
95    self.sync_longest_delay_us_counter.load(Ordering::Relaxed)
96  }
97
98  /// Total number of microseconds spent waiting after a final delayed sync before the actual sync.
99  pub fn sync_shortest_delay_us_counter(&self) -> u64 {
100    self.sync_shortest_delay_us_counter.load(Ordering::Relaxed)
101  }
102
103  /// Total number of microseconds spent in fsync and fdatasync syscalls.
104  pub fn sync_us_counter(&self) -> u64 {
105    self.sync_us_counter.load(Ordering::Relaxed)
106  }
107
108  /// Total number of bytes written.
109  pub fn write_bytes_counter(&self) -> u64 {
110    self.write_bytes_counter.load(Ordering::Relaxed)
111  }
112
113  /// Total number of write syscalls.
114  pub fn write_counter(&self) -> u64 {
115    self.write_counter.load(Ordering::Relaxed)
116  }
117
118  /// Total number of microseconds spent in write syscalls.
119  pub fn write_us_counter(&self) -> u64 {
120    self.write_us_counter.load(Ordering::Relaxed)
121  }
122}
123
124/// A `File`-like value that can perform async `read_at` and `write_at` for I/O at specific offsets without mutating any state (i.e. is thread safe). Metrics are collected, and syncs can be delayed for write batching opportunities as a performance optimisation.
125#[derive(Clone)]
126pub struct SeekableAsyncFile {
127  io: Arc<dyn IAsyncIO>,
128  sync_delay_us: u64,
129  metrics: Arc<SeekableAsyncFileMetrics>,
130  pending_sync_state: Arc<Mutex<PendingSyncState>>,
131}
132
133impl SeekableAsyncFile {
134  pub async fn open(
135    io: Arc<dyn IAsyncIO>,
136    metrics: Arc<SeekableAsyncFileMetrics>,
137    sync_delay: Duration,
138  ) -> Self {
139    SeekableAsyncFile {
140      io,
141      sync_delay_us: sync_delay.as_micros().try_into().unwrap(),
142      metrics,
143      pending_sync_state: Arc::new(Mutex::new(PendingSyncState {
144        earliest_unsynced: None,
145        latest_unsynced: None,
146        pending_sync_fut_states: Vec::new(),
147      })),
148    }
149  }
150
151  fn bump_write_metrics(&self, len: u64, call_us: u64) {
152    self
153      .metrics
154      .write_bytes_counter
155      .fetch_add(len, Ordering::Relaxed);
156    self.metrics.write_counter.fetch_add(1, Ordering::Relaxed);
157    self
158      .metrics
159      .write_us_counter
160      .fetch_add(call_us, Ordering::Relaxed);
161  }
162
163  pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
164    self.io.read_at(offset, len).await
165  }
166
167  pub async fn write_at<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) {
168    let len = data.as_ref().len();
169    let started = Instant::now();
170    self.io.write_at(offset, data.as_ref()).await;
171    let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
172    self.bump_write_metrics(len.try_into().unwrap(), call_us);
173  }
174
175  // Later writes win where there are overlaps.
176  pub async fn write_at_with_delayed_sync<D: AsRef<[u8]> + Send + 'static>(
177    &self,
178    writes: impl IntoIterator<Item = WriteRequest<D>>,
179  ) {
180    // Count original writes and merge overlapping intervals.
181    let writes_vec: Vec<_> = writes.into_iter().collect();
182    let count = u64!(writes_vec.len());
183    let intervals = merge_overlapping_writes(writes_vec);
184
185    // Execute all non-overlapping writes concurrently.
186    iter(intervals)
187      .for_each_concurrent(None, async |(offset, (_, data))| {
188        self.write_at(offset, data).await;
189      })
190      .await;
191
192    let (fut, fut_ctl) = SignalFuture::new();
193
194    {
195      let mut state = self.pending_sync_state.lock().await;
196      let now = Instant::now();
197      state.earliest_unsynced.get_or_insert(now);
198      state.latest_unsynced = Some(now);
199      state.pending_sync_fut_states.push(fut_ctl.clone());
200    };
201
202    self
203      .metrics
204      .sync_delayed_counter
205      .fetch_add(count, Ordering::Relaxed);
206
207    fut.await;
208  }
209
210  pub async fn start_delayed_data_sync_background_loop(&self) {
211    // Store these outside and reuse them to avoid reallocations on each loop.
212    let mut futures_to_wake = Vec::new();
213    loop {
214      sleep(std::time::Duration::from_micros(self.sync_delay_us)).await;
215
216      struct SyncNow {
217        longest_delay_us: u64,
218        shortest_delay_us: u64,
219      }
220
221      let sync_now = {
222        let mut state = self.pending_sync_state.lock().await;
223
224        if !state.pending_sync_fut_states.is_empty() {
225          let longest_delay_us = dur_us(state.earliest_unsynced.unwrap());
226          let shortest_delay_us = dur_us(state.latest_unsynced.unwrap());
227
228          state.earliest_unsynced = None;
229          state.latest_unsynced = None;
230
231          futures_to_wake.extend(state.pending_sync_fut_states.drain(..));
232
233          Some(SyncNow {
234            longest_delay_us,
235            shortest_delay_us,
236          })
237        } else {
238          None
239        }
240      };
241
242      if let Some(SyncNow {
243        longest_delay_us,
244        shortest_delay_us,
245      }) = sync_now
246      {
247        // OPTIMISATION: Don't perform these atomic operations while unnecessarily holding up the lock.
248        self
249          .metrics
250          .sync_longest_delay_us_counter
251          .fetch_add(longest_delay_us, Ordering::Relaxed);
252        self
253          .metrics
254          .sync_shortest_delay_us_counter
255          .fetch_add(shortest_delay_us, Ordering::Relaxed);
256
257        self.sync_data().await;
258
259        for ft in futures_to_wake.drain(..) {
260          ft.signal();
261        }
262      };
263
264      self
265        .metrics
266        .sync_background_loops_counter
267        .fetch_add(1, Ordering::Relaxed);
268    }
269  }
270
271  pub async fn sync_data(&self) {
272    let started = Instant::now();
273    self.io.sync_data().await;
274    let sync_us: u64 = started.elapsed().as_micros().try_into().unwrap();
275    self.metrics.sync_counter.fetch_add(1, Ordering::Relaxed);
276    self
277      .metrics
278      .sync_us_counter
279      .fetch_add(sync_us, Ordering::Relaxed);
280  }
281}
282
283#[async_trait]
284impl<'a> Off64AsyncRead<'a, Vec<u8>> for SeekableAsyncFile {
285  async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
286    SeekableAsyncFile::read_at(self, offset, len).await
287  }
288}
289impl<'a> Off64AsyncReadChrono<'a, Vec<u8>> for SeekableAsyncFile {}
290impl<'a> Off64AsyncReadInt<'a, Vec<u8>> for SeekableAsyncFile {}
291
292#[async_trait]
293impl Off64AsyncWrite for SeekableAsyncFile {
294  async fn write_at(&self, offset: u64, value: &[u8]) {
295    SeekableAsyncFile::write_at(self, offset, value.to_vec()).await
296  }
297}
298impl Off64AsyncWriteChrono for SeekableAsyncFile {}
299impl Off64AsyncWriteInt for SeekableAsyncFile {}