random_access_disk/
lib.rs

1#![forbid(missing_docs)]
2#![cfg_attr(test, deny(warnings))]
3#![doc(test(attr(deny(warnings))))]
4
5//! # Continuously read and write to disk, using random offsets and lengths
6//! [RandomAccessDisk] is a complete implementation of [random-access-storage](https://docs.rs/random-access-storage)
7//! for in-memory storage.
8//!
9//! See also [random-access-memory](https://docs.rs/random-access-memory) for in-memory storage
10//! that can be swapped with this.
11//!
12//! ## Features
13//!
14//! ### `sparse` (default)
15//!
16//! Deleting may create sparse files, on by default. Creation of sparse files is tested on OSX, linux and Windows.
17//!
18//! **NB**: If this is on, `unsafe` code is used to make direct platform-specific calls!
19//!
20//! ### `async-std` (default)
21//!
22//! Use the async-std runtime, on by default. Either this or `tokio` is mandatory.
23//!
24//! ### `tokio`
25//!
26//! Use the tokio runtime. Either this or `async_std` is mandatory.
27//!
28//! ## Examples
29//!
30//! Reading, writing, deleting and truncating:
31//!
32//! ```
33//! # #[cfg(feature = "tokio")]
34//! # tokio_test::block_on(async {
35//! # example().await;
36//! # });
37//! # #[cfg(feature = "async-std")]
38//! # async_std::task::block_on(async {
39//! # example().await;
40//! # });
41//! # async fn example() {
42//! use random_access_storage::RandomAccess;
43//! use random_access_disk::RandomAccessDisk;
44//!
45//! let path = tempfile::Builder::new().prefix("basic").tempfile().unwrap().into_temp_path();
46//! let mut storage = RandomAccessDisk::open(path.to_path_buf()).await.unwrap();
47//! storage.write(0, b"hello").await.unwrap();
48//! storage.write(5, b" world").await.unwrap();
49//! assert_eq!(storage.read(0, 11).await.unwrap(), b"hello world");
50//! assert_eq!(storage.len().await.unwrap(), 11);
51//! storage.del(5, 2).await.unwrap();
52//! assert_eq!(storage.read(5, 2).await.unwrap(), [0, 0]);
53//! assert_eq!(storage.len().await.unwrap(), 11);
54//! storage.truncate(2).await.unwrap();
55//! assert_eq!(storage.len().await.unwrap(), 2);
56//! storage.truncate(5).await.unwrap();
57//! assert_eq!(storage.len().await.unwrap(), 5);
58//! assert_eq!(storage.read(0, 5).await.unwrap(), [b'h', b'e', 0, 0, 0]);
59//! # }
60//! ```
61//!
62//! In order to get benefits from the swappable interface, you will
63//! in most cases want to use generic functions for storage manipulation:
64//!
65//! ```
66//! # #[cfg(feature = "tokio")]
67//! # tokio_test::block_on(async {
68//! # example().await;
69//! # });
70//! # #[cfg(feature = "async-std")]
71//! # async_std::task::block_on(async {
72//! # example().await;
73//! # });
74//! # async fn example() {
75//! use random_access_storage::RandomAccess;
76//! use random_access_disk::RandomAccessDisk;
77//! use std::fmt::Debug;
78//!
79//! let path = tempfile::Builder::new().prefix("swappable").tempfile().unwrap().into_temp_path();
80//! let mut storage = RandomAccessDisk::open(path.to_path_buf()).await.unwrap();
81//! write_hello_world(&mut storage).await;
82//! assert_eq!(read_hello_world(&mut storage).await, b"hello world");
83//!
84//! /// Write with swappable storage
85//! async fn write_hello_world<T>(storage: &mut T)
86//! where T: RandomAccess + Debug + Send,
87//! {
88//!   storage.write(0, b"hello").await.unwrap();
89//!   storage.write(5, b" world").await.unwrap();
90//! }
91//!
92//! /// Read with swappable storage
93//! async fn read_hello_world<T>(storage: &mut T) -> Vec<u8>
94//! where T: RandomAccess + Debug + Send,
95//! {
96//!   storage.read(0, 11).await.unwrap()
97//! }
98//! # }
99
100#[cfg(not(any(feature = "async-std", feature = "tokio")))]
101compile_error!(
102  "Either feature `random-access-disk/async-std` or `random-access-disk/tokio` must be enabled."
103);
104
105#[cfg(all(feature = "async-std", feature = "tokio"))]
106compile_error!("features `random-access-disk/async-std` and `random-access-disk/tokio` are mutually exclusive");
107
108#[cfg(feature = "async-std")]
109use async_std::{
110  fs::{self, OpenOptions},
111  io::prelude::{SeekExt, WriteExt},
112  io::{ReadExt, SeekFrom},
113};
114use random_access_storage::{RandomAccess, RandomAccessError};
115use std::ops::Drop;
116use std::path;
117
118#[cfg(feature = "tokio")]
119use std::io::SeekFrom;
120#[cfg(feature = "tokio")]
121use tokio::{
122  fs::{self, OpenOptions},
123  io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
124};
125
126#[cfg(all(
127  feature = "sparse",
128  any(
129    target_os = "linux",
130    target_os = "android",
131    target_os = "freebsd",
132    target_os = "macos",
133  )
134))]
135mod unix;
136#[cfg(all(
137  feature = "sparse",
138  any(
139    target_os = "linux",
140    target_os = "android",
141    target_os = "freebsd",
142    target_os = "macos",
143  )
144))]
145use unix::{get_length_and_block_size, set_sparse, trim};
146
147#[cfg(all(feature = "sparse", windows))]
148mod windows;
149#[cfg(all(feature = "sparse", windows))]
150use windows::{get_length_and_block_size, set_sparse, trim};
151
152#[cfg(not(all(
153  feature = "sparse",
154  any(
155    target_os = "linux",
156    target_os = "android",
157    target_os = "freebsd",
158    target_os = "macos",
159    windows,
160  )
161)))]
162mod default;
163
164#[cfg(not(all(
165  feature = "sparse",
166  any(
167    target_os = "linux",
168    target_os = "android",
169    target_os = "freebsd",
170    target_os = "macos",
171    windows,
172  )
173)))]
174use default::{get_length_and_block_size, set_sparse, trim};
175
176/// Main constructor.
177#[derive(Debug)]
178pub struct RandomAccessDisk {
179  #[allow(dead_code)]
180  filename: path::PathBuf,
181  file: Option<fs::File>,
182  length: u64,
183  block_size: u64,
184  auto_sync: bool,
185}
186
187impl RandomAccessDisk {
188  /// Create a new (auto-sync) instance to storage at `filename`.
189  #[allow(clippy::new_ret_no_self)]
190  pub async fn open(
191    filename: impl AsRef<path::Path>,
192  ) -> Result<RandomAccessDisk, RandomAccessError> {
193    Self::builder(filename).build().await
194  }
195
196  /// Initialize a builder with storage at `filename`.
197  pub fn builder(filename: impl AsRef<path::Path>) -> Builder {
198    Builder::new(filename)
199  }
200}
201
202#[async_trait::async_trait]
203impl RandomAccess for RandomAccessDisk {
204  async fn write(
205    &mut self,
206    offset: u64,
207    data: &[u8],
208  ) -> Result<(), RandomAccessError> {
209    let file = self.file.as_mut().expect("self.file was None.");
210    file.seek(SeekFrom::Start(offset)).await?;
211    file.write_all(data).await?;
212    if self.auto_sync {
213      file.sync_all().await?;
214    }
215
216    // We've changed the length of our file.
217    let new_len = offset + (data.len() as u64);
218    if new_len > self.length {
219      self.length = new_len;
220    }
221
222    Ok(())
223  }
224
225  // NOTE(yw): disabling clippy here because we files on disk might be sparse,
226  // and sometimes you might want to read a bit of memory to check if it's
227  // formatted or not. Returning zero'd out memory seems like an OK thing to do.
228  // We should probably come back to this at a future point, and determine
229  // whether it's okay to return a fully zero'd out slice. It's a bit weird,
230  // because we're replacing empty data with actual zeroes - which does not
231  // reflect the state of the world.
232  // #[cfg_attr(test, allow(unused_io_amount))]
233  async fn read(
234    &mut self,
235    offset: u64,
236    length: u64,
237  ) -> Result<Vec<u8>, RandomAccessError> {
238    if offset + length > self.length {
239      return Err(RandomAccessError::OutOfBounds {
240        offset,
241        end: Some(offset + length),
242        length: self.length,
243      });
244    }
245
246    let file = self.file.as_mut().expect("self.file was None.");
247    let mut buffer = vec![0; length as usize];
248    file.seek(SeekFrom::Start(offset)).await?;
249    let _bytes_read = file.read(&mut buffer[..]).await?;
250    Ok(buffer)
251  }
252
253  async fn del(
254    &mut self,
255    offset: u64,
256    length: u64,
257  ) -> Result<(), RandomAccessError> {
258    if offset > self.length {
259      return Err(RandomAccessError::OutOfBounds {
260        offset,
261        end: None,
262        length: self.length,
263      });
264    };
265
266    if length == 0 {
267      // No-op
268      return Ok(());
269    }
270
271    // Delete is truncate if up to the current length or more is deleted
272    if offset + length >= self.length {
273      return self.truncate(offset).await;
274    }
275
276    let file = self.file.as_mut().expect("self.file was None.");
277    trim(file, offset, length, self.block_size).await?;
278    if self.auto_sync {
279      file.sync_all().await?;
280    }
281    Ok(())
282  }
283
284  async fn truncate(&mut self, length: u64) -> Result<(), RandomAccessError> {
285    let file = self.file.as_ref().expect("self.file was None.");
286    self.length = length;
287    file.set_len(self.length).await?;
288    if self.auto_sync {
289      file.sync_all().await?;
290    }
291    Ok(())
292  }
293
294  async fn len(&mut self) -> Result<u64, RandomAccessError> {
295    Ok(self.length)
296  }
297
298  async fn is_empty(&mut self) -> Result<bool, RandomAccessError> {
299    Ok(self.length == 0)
300  }
301
302  async fn sync_all(&mut self) -> Result<(), RandomAccessError> {
303    if !self.auto_sync {
304      let file = self.file.as_ref().expect("self.file was None.");
305      file.sync_all().await?;
306    }
307    Ok(())
308  }
309}
310
311impl Drop for RandomAccessDisk {
312  fn drop(&mut self) {
313    // We need to flush the file on drop. Unfortunately, that is not possible to do in a
314    // non-blocking fashion, but our only other option here is losing data remaining in the
315    // write cache. Good task schedulers should be resilient to occasional blocking hiccups in
316    // file destructors so we don't expect this to be a common problem in practice.
317    // (from async_std::fs::File::drop)
318    #[cfg(feature = "async-std")]
319    if let Some(file) = &self.file {
320      let _ = async_std::task::block_on(file.sync_all());
321    }
322    // For tokio, the below errors with:
323    //
324    // "Cannot start a runtime from within a runtime. This happens because a function (like
325    // `block_on`) attempted to block the current thread while the thread is being used to
326    // drive asynchronous tasks."
327    //
328    // There doesn't seem to be an equivalent block_on version for tokio that actually works
329    // in a sync drop(), so for tokio, we'll need to wait for a real AsyncDrop to arrive.
330    //
331    // #[cfg(feature = "tokio")]
332    // if let Some(file) = &self.file {
333    //   tokio::runtime::Handle::current()
334    //     .block_on(file.sync_all())
335    //     .expect("Could not sync file changes on drop.");
336    // }
337  }
338}
339
340/// Builder for [RandomAccessDisk]
341pub struct Builder {
342  filename: path::PathBuf,
343  auto_sync: bool,
344}
345
346impl Builder {
347  /// Create new builder at `path` (with auto-sync true by default).
348  pub fn new(filename: impl AsRef<path::Path>) -> Self {
349    Self {
350      filename: filename.as_ref().into(),
351      auto_sync: true,
352    }
353  }
354
355  /// Set auto-sync
356  // NB: Because of no AsyncDrop, tokio can not ensure that changes are synced when dropped,
357  // see impl Drop above.
358  #[cfg(feature = "async-std")]
359  pub fn auto_sync(mut self, auto_sync: bool) -> Self {
360    self.auto_sync = auto_sync;
361    self
362  }
363
364  /// Build a [RandomAccessDisk] instance
365  pub async fn build(self) -> Result<RandomAccessDisk, RandomAccessError> {
366    if let Some(dirname) = self.filename.parent() {
367      mkdirp::mkdirp(dirname)?;
368    }
369    let mut file = OpenOptions::new()
370      .create(true)
371      .read(true)
372      .write(true)
373      .open(&self.filename)
374      .await?;
375    file.sync_all().await?;
376
377    set_sparse(&mut file).await?;
378
379    let (length, block_size) = get_length_and_block_size(&file).await?;
380    Ok(RandomAccessDisk {
381      filename: self.filename,
382      file: Some(file),
383      length,
384      auto_sync: self.auto_sync,
385      block_size,
386    })
387  }
388}