seekable_async_file/
lib.rs1pub mod common;
2pub mod file;
3pub mod merge;
4pub mod mmap;
5
6use crate::common::IAsyncIO;
7use crate::merge::merge_overlapping_writes;
8use async_trait::async_trait;
9use futures::stream::iter;
10use futures::stream::StreamExt;
11use off64::chrono::Off64AsyncReadChrono;
12use off64::chrono::Off64AsyncWriteChrono;
13use off64::int::Off64AsyncReadInt;
14use off64::int::Off64AsyncWriteInt;
15use off64::u64;
16use off64::Off64AsyncRead;
17use off64::Off64AsyncWrite;
18use signal_future::SignalFuture;
19use signal_future::SignalFutureController;
20use std::io::SeekFrom;
21use std::path::Path;
22use std::sync::atomic::AtomicU64;
23use std::sync::atomic::Ordering;
24use std::sync::Arc;
25use std::time::Duration;
26use tokio::fs::File;
27use tokio::io;
28use tokio::io::AsyncSeekExt;
29use tokio::sync::Mutex;
30use tokio::time::sleep;
31use tokio::time::Instant;
32
33pub async fn get_file_len_via_seek(path: &Path) -> io::Result<u64> {
34 let mut file = File::open(path).await?;
35 file.seek(SeekFrom::End(0)).await
37}
38
39fn dur_us(dur: Instant) -> u64 {
40 dur.elapsed().as_micros().try_into().unwrap()
41}
42
43pub struct WriteRequest<D: AsRef<[u8]> + Send + 'static> {
45 data: D,
46 offset: u64,
47}
48
49impl<D: AsRef<[u8]> + Send + 'static> WriteRequest<D> {
50 pub fn new(offset: u64, data: D) -> Self {
51 Self { data, offset }
52 }
53}
54
55struct PendingSyncState {
56 earliest_unsynced: Option<Instant>, latest_unsynced: Option<Instant>,
58 pending_sync_fut_states: Vec<SignalFutureController>,
59}
60
61#[derive(Default, Debug)]
65pub struct SeekableAsyncFileMetrics {
66 sync_background_loops_counter: AtomicU64,
67 sync_counter: AtomicU64,
68 sync_delayed_counter: AtomicU64,
69 sync_longest_delay_us_counter: AtomicU64,
70 sync_shortest_delay_us_counter: AtomicU64,
71 sync_us_counter: AtomicU64,
72 write_bytes_counter: AtomicU64,
73 write_counter: AtomicU64,
74 write_us_counter: AtomicU64,
75}
76
77impl SeekableAsyncFileMetrics {
78 pub fn sync_background_loops_counter(&self) -> u64 {
80 self.sync_background_loops_counter.load(Ordering::Relaxed)
81 }
82
83 pub fn sync_counter(&self) -> u64 {
85 self.sync_counter.load(Ordering::Relaxed)
86 }
87
88 pub fn sync_delayed_counter(&self) -> u64 {
90 self.sync_delayed_counter.load(Ordering::Relaxed)
91 }
92
93 pub fn sync_longest_delay_us_counter(&self) -> u64 {
95 self.sync_longest_delay_us_counter.load(Ordering::Relaxed)
96 }
97
98 pub fn sync_shortest_delay_us_counter(&self) -> u64 {
100 self.sync_shortest_delay_us_counter.load(Ordering::Relaxed)
101 }
102
103 pub fn sync_us_counter(&self) -> u64 {
105 self.sync_us_counter.load(Ordering::Relaxed)
106 }
107
108 pub fn write_bytes_counter(&self) -> u64 {
110 self.write_bytes_counter.load(Ordering::Relaxed)
111 }
112
113 pub fn write_counter(&self) -> u64 {
115 self.write_counter.load(Ordering::Relaxed)
116 }
117
118 pub fn write_us_counter(&self) -> u64 {
120 self.write_us_counter.load(Ordering::Relaxed)
121 }
122}
123
124#[derive(Clone)]
126pub struct SeekableAsyncFile {
127 io: Arc<dyn IAsyncIO>,
128 sync_delay_us: u64,
129 metrics: Arc<SeekableAsyncFileMetrics>,
130 pending_sync_state: Arc<Mutex<PendingSyncState>>,
131}
132
133impl SeekableAsyncFile {
134 pub async fn open(
135 io: Arc<dyn IAsyncIO>,
136 metrics: Arc<SeekableAsyncFileMetrics>,
137 sync_delay: Duration,
138 ) -> Self {
139 SeekableAsyncFile {
140 io,
141 sync_delay_us: sync_delay.as_micros().try_into().unwrap(),
142 metrics,
143 pending_sync_state: Arc::new(Mutex::new(PendingSyncState {
144 earliest_unsynced: None,
145 latest_unsynced: None,
146 pending_sync_fut_states: Vec::new(),
147 })),
148 }
149 }
150
151 fn bump_write_metrics(&self, len: u64, call_us: u64) {
152 self
153 .metrics
154 .write_bytes_counter
155 .fetch_add(len, Ordering::Relaxed);
156 self.metrics.write_counter.fetch_add(1, Ordering::Relaxed);
157 self
158 .metrics
159 .write_us_counter
160 .fetch_add(call_us, Ordering::Relaxed);
161 }
162
163 pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
164 self.io.read_at(offset, len).await
165 }
166
167 pub async fn write_at<D: AsRef<[u8]> + Send + 'static>(&self, offset: u64, data: D) {
168 let len = data.as_ref().len();
169 let started = Instant::now();
170 self.io.write_at(offset, data.as_ref()).await;
171 let call_us: u64 = started.elapsed().as_micros().try_into().unwrap();
172 self.bump_write_metrics(len.try_into().unwrap(), call_us);
173 }
174
175 pub async fn write_at_with_delayed_sync<D: AsRef<[u8]> + Send + 'static>(
177 &self,
178 writes: impl IntoIterator<Item = WriteRequest<D>>,
179 ) {
180 let writes_vec: Vec<_> = writes.into_iter().collect();
182 let count = u64!(writes_vec.len());
183 let intervals = merge_overlapping_writes(writes_vec);
184
185 iter(intervals)
187 .for_each_concurrent(None, async |(offset, (_, data))| {
188 self.write_at(offset, data).await;
189 })
190 .await;
191
192 let (fut, fut_ctl) = SignalFuture::new();
193
194 {
195 let mut state = self.pending_sync_state.lock().await;
196 let now = Instant::now();
197 state.earliest_unsynced.get_or_insert(now);
198 state.latest_unsynced = Some(now);
199 state.pending_sync_fut_states.push(fut_ctl.clone());
200 };
201
202 self
203 .metrics
204 .sync_delayed_counter
205 .fetch_add(count, Ordering::Relaxed);
206
207 fut.await;
208 }
209
210 pub async fn start_delayed_data_sync_background_loop(&self) {
211 let mut futures_to_wake = Vec::new();
213 loop {
214 sleep(std::time::Duration::from_micros(self.sync_delay_us)).await;
215
216 struct SyncNow {
217 longest_delay_us: u64,
218 shortest_delay_us: u64,
219 }
220
221 let sync_now = {
222 let mut state = self.pending_sync_state.lock().await;
223
224 if !state.pending_sync_fut_states.is_empty() {
225 let longest_delay_us = dur_us(state.earliest_unsynced.unwrap());
226 let shortest_delay_us = dur_us(state.latest_unsynced.unwrap());
227
228 state.earliest_unsynced = None;
229 state.latest_unsynced = None;
230
231 futures_to_wake.extend(state.pending_sync_fut_states.drain(..));
232
233 Some(SyncNow {
234 longest_delay_us,
235 shortest_delay_us,
236 })
237 } else {
238 None
239 }
240 };
241
242 if let Some(SyncNow {
243 longest_delay_us,
244 shortest_delay_us,
245 }) = sync_now
246 {
247 self
249 .metrics
250 .sync_longest_delay_us_counter
251 .fetch_add(longest_delay_us, Ordering::Relaxed);
252 self
253 .metrics
254 .sync_shortest_delay_us_counter
255 .fetch_add(shortest_delay_us, Ordering::Relaxed);
256
257 self.sync_data().await;
258
259 for ft in futures_to_wake.drain(..) {
260 ft.signal();
261 }
262 };
263
264 self
265 .metrics
266 .sync_background_loops_counter
267 .fetch_add(1, Ordering::Relaxed);
268 }
269 }
270
271 pub async fn sync_data(&self) {
272 let started = Instant::now();
273 self.io.sync_data().await;
274 let sync_us: u64 = started.elapsed().as_micros().try_into().unwrap();
275 self.metrics.sync_counter.fetch_add(1, Ordering::Relaxed);
276 self
277 .metrics
278 .sync_us_counter
279 .fetch_add(sync_us, Ordering::Relaxed);
280 }
281}
282
283#[async_trait]
284impl<'a> Off64AsyncRead<'a, Vec<u8>> for SeekableAsyncFile {
285 async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
286 SeekableAsyncFile::read_at(self, offset, len).await
287 }
288}
289impl<'a> Off64AsyncReadChrono<'a, Vec<u8>> for SeekableAsyncFile {}
290impl<'a> Off64AsyncReadInt<'a, Vec<u8>> for SeekableAsyncFile {}
291
292#[async_trait]
293impl Off64AsyncWrite for SeekableAsyncFile {
294 async fn write_at(&self, offset: u64, value: &[u8]) {
295 SeekableAsyncFile::write_at(self, offset, value.to_vec()).await
296 }
297}
298impl Off64AsyncWriteChrono for SeekableAsyncFile {}
299impl Off64AsyncWriteInt for SeekableAsyncFile {}