Skip to main content

random_access_disk/
lib.rs

1#![forbid(missing_docs)]
2#![cfg_attr(test, deny(warnings))]
3#![doc(test(attr(deny(warnings))))]
4
5//! # Continuously read and write to disk, using random offsets and lengths
6//! [RandomAccessDisk] is a complete implementation of [random-access-storage](https://docs.rs/random-access-storage)
7//! for in-memory storage.
8//!
9//! See also [random-access-memory](https://docs.rs/random-access-memory) for in-memory storage
10//! that can be swapped with this.
11//!
12//! ## Features
13//!
14//! ### `sparse` (default)
15//!
16//! Deleting may create sparse files, on by default. Creation of sparse files is tested on OSX, linux and Windows.
17//!
18//! **NB**: If this is on, `unsafe` code is used to make direct platform-specific calls!
19//!
20//! ## Examples
21//!
22//! Reading, writing, deleting and truncating:
23//!
24//! ```
25//! # tokio_test::block_on(async {
26//! # example().await;
27//! # });
28//! # async fn example() {
29//! use random_access_disk::RandomAccessDisk;
30//! use random_access_storage::RandomAccess;
31//!
32//! let path = tempfile::Builder::new()
33//!     .prefix("basic")
34//!     .tempfile()
35//!     .unwrap()
36//!     .into_temp_path();
37//! let storage = RandomAccessDisk::open(path.to_path_buf()).await.unwrap();
38//! storage.write(0, b"hello").await.unwrap();
39//! storage.write(5, b" world").await.unwrap();
40//! assert_eq!(storage.read(0, 11).await.unwrap(), b"hello world");
41//! assert_eq!(storage.len(), 11);
42//! storage.del(5, 2).await.unwrap();
43//! assert_eq!(storage.read(5, 2).await.unwrap(), [0, 0]);
44//! assert_eq!(storage.len(), 11);
45//! storage.truncate(2).await.unwrap();
46//! assert_eq!(storage.len(), 2);
47//! storage.truncate(5).await.unwrap();
48//! assert_eq!(storage.len(), 5);
49//! assert_eq!(storage.read(0, 5).await.unwrap(), [b'h', b'e', 0, 0, 0]);
50//! # }
51//! ```
52//!
53//! In order to get benefits from the swappable interface, you will
54//! in most cases want to use generic functions for storage manipulation:
55//!
56//! ```
57//! # tokio_test::block_on(async {
58//! # example().await;
59//! # });
60//! # async fn example() {
61//! use random_access_storage::RandomAccess;
62//! use random_access_disk::RandomAccessDisk;
63//! use std::fmt::Debug;
64//!
65//! let path = tempfile::Builder::new().prefix("swappable").tempfile().unwrap().into_temp_path();
66//! let storage = RandomAccessDisk::open(path.to_path_buf()).await.unwrap();
67//! write_hello_world(&storage).await;
68//! assert_eq!(read_hello_world(&storage).await, b"hello world");
69//!
70//! /// Write with swappable storage
71//! async fn write_hello_world<T>(storage: &T)
72//! where T: RandomAccess + Debug + Send,
73//! {
74//!   storage.write(0, b"hello").await.unwrap();
75//!   storage.write(5, b" world").await.unwrap();
76//! }
77//!
78//! /// Read with swappable storage
79//! async fn read_hello_world<T>(storage: &T) -> Vec<u8>
80//! where T: RandomAccess + Debug + Send,
81//! {
82//!   storage.read(0, 11).await.unwrap()
83//! }
84//! # }
85
86use async_lock::Mutex;
87use random_access_storage::{BoxFuture, RandomAccess, RandomAccessError};
88use std::{
89    io::SeekFrom,
90    path,
91    sync::{
92        Arc,
93        atomic::{AtomicU64, Ordering},
94    },
95};
96use tokio::{
97    fs::{self, OpenOptions},
98    io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
99};
100
101#[cfg(all(
102    feature = "sparse",
103    any(
104        target_os = "linux",
105        target_os = "android",
106        target_os = "freebsd",
107        target_os = "macos",
108    )
109))]
110mod unix;
111#[cfg(all(
112    feature = "sparse",
113    any(
114        target_os = "linux",
115        target_os = "android",
116        target_os = "freebsd",
117        target_os = "macos",
118    )
119))]
120use unix::{get_length_and_block_size, set_sparse, trim};
121
122#[cfg(all(feature = "sparse", windows))]
123mod windows;
124#[cfg(all(feature = "sparse", windows))]
125use windows::{get_length_and_block_size, set_sparse, trim};
126
127#[cfg(not(all(
128    feature = "sparse",
129    any(
130        target_os = "linux",
131        target_os = "android",
132        target_os = "freebsd",
133        target_os = "macos",
134        windows,
135    )
136)))]
137mod default;
138
139#[cfg(not(all(
140    feature = "sparse",
141    any(
142        target_os = "linux",
143        target_os = "android",
144        target_os = "freebsd",
145        target_os = "macos",
146        windows,
147    )
148)))]
149use default::{get_length_and_block_size, set_sparse, trim};
150
151/// Internal mutable state of [RandomAccessDisk].
152#[derive(Debug)]
153struct DiskInner {
154    file: Option<fs::File>,
155    length: u64,
156    block_size: u64,
157    auto_sync: bool,
158}
159
160impl DiskInner {
161    async fn do_truncate(&mut self, length: u64) -> Result<(), RandomAccessError> {
162        self.length = length;
163        let auto_sync = self.auto_sync;
164        let file = self.file.as_ref().expect("self.file was None.");
165        file.set_len(length).await?;
166        if auto_sync {
167            file.sync_all().await?;
168        }
169        Ok(())
170    }
171}
172
173/// Main constructor.
174#[derive(Debug, Clone)]
175pub struct RandomAccessDisk {
176    #[allow(dead_code)]
177    filename: path::PathBuf,
178    inner: Arc<Mutex<DiskInner>>,
179    /// Cached length for synchronous reads via [`RandomAccess::len`].
180    length: Arc<AtomicU64>,
181}
182
183impl RandomAccessDisk {
184    /// Create a new (auto-sync) instance to storage at `filename`.
185    #[allow(clippy::new_ret_no_self)]
186    pub async fn open(
187        filename: impl AsRef<path::Path>,
188    ) -> Result<RandomAccessDisk, RandomAccessError> {
189        Self::builder(filename).build().await
190    }
191
192    /// Initialize a builder with storage at `filename`.
193    pub fn builder(filename: impl AsRef<path::Path>) -> Builder {
194        Builder::new(filename)
195    }
196}
197
198impl RandomAccess for RandomAccessDisk {
199    fn write(&self, offset: u64, data: &[u8]) -> BoxFuture<Result<(), RandomAccessError>> {
200        let inner = self.inner.clone();
201        let length_arc = Arc::clone(&self.length);
202        let data = data.to_vec();
203        Box::pin(async move {
204            let mut inner = inner.lock().await;
205            let auto_sync = inner.auto_sync;
206            let new_len = offset + (data.len() as u64);
207            {
208                let file = inner.file.as_mut().expect("self.file was None.");
209                file.seek(SeekFrom::Start(offset)).await?;
210                file.write_all(&data).await?;
211                if auto_sync {
212                    file.sync_all().await?;
213                }
214            }
215            if new_len > inner.length {
216                inner.length = new_len;
217                length_arc.store(new_len, Ordering::Relaxed);
218            }
219            Ok(())
220        })
221    }
222
223    // NOTE(yw): disabling clippy here because we files on disk might be sparse,
224    // and sometimes you might want to read a bit of memory to check if it's
225    // formatted or not. Returning zero'd out memory seems like an OK thing to do.
226    // We should probably come back to this at a future point, and determine
227    // whether it's okay to return a fully zero'd out slice. It's a bit weird,
228    // because we're replacing empty data with actual zeroes - which does not
229    // reflect the state of the world.
230    // #[cfg_attr(test, allow(unused_io_amount))]
231    fn read(&self, offset: u64, length: u64) -> BoxFuture<Result<Vec<u8>, RandomAccessError>> {
232        let inner = self.inner.clone();
233        Box::pin(async move {
234            let mut guard = inner.lock().await;
235            let stored_length = guard.length;
236            if offset + length > stored_length {
237                return Err(RandomAccessError::OutOfBounds {
238                    offset,
239                    end: Some(offset + length),
240                    length: stored_length,
241                });
242            }
243            let file = guard.file.as_mut().expect("self.file was None.");
244            let mut buffer = vec![0; length as usize];
245            file.seek(SeekFrom::Start(offset)).await?;
246            let _bytes_read = file.read(&mut buffer[..]).await?;
247            Ok(buffer)
248        })
249    }
250
251    fn del(&self, offset: u64, length: u64) -> BoxFuture<Result<(), RandomAccessError>> {
252        let inner = self.inner.clone();
253        let length_arc = Arc::clone(&self.length);
254        Box::pin(async move {
255            let mut inner = inner.lock().await;
256            if offset > inner.length {
257                return Err(RandomAccessError::OutOfBounds {
258                    offset,
259                    end: None,
260                    length: inner.length,
261                });
262            };
263
264            if length == 0 {
265                // No-op
266                return Ok(());
267            }
268
269            // Delete is truncate if up to the current length or more is deleted
270            if offset + length >= inner.length {
271                inner.do_truncate(offset).await?;
272                length_arc.store(offset, Ordering::Relaxed);
273                return Ok(());
274            }
275
276            let auto_sync = inner.auto_sync;
277            let block_size = inner.block_size;
278            let file = inner.file.as_mut().expect("self.file was None.");
279            trim(file, offset, length, block_size).await?;
280            if auto_sync {
281                file.sync_all().await?;
282            }
283            Ok(())
284        })
285    }
286
287    fn truncate(&self, length: u64) -> BoxFuture<Result<(), RandomAccessError>> {
288        let inner = self.inner.clone();
289        let length_arc = Arc::clone(&self.length);
290        Box::pin(async move {
291            let mut inner = inner.lock().await;
292            inner.do_truncate(length).await?;
293            length_arc.store(length, Ordering::Relaxed);
294            Ok(())
295        })
296    }
297
298    fn len(&self) -> u64 {
299        self.length.load(Ordering::Relaxed)
300    }
301
302    fn sync_all(&self) -> BoxFuture<Result<(), RandomAccessError>> {
303        let inner = self.inner.clone();
304        Box::pin(async move {
305            let inner = inner.lock().await;
306            if !inner.auto_sync {
307                let file = inner.file.as_ref().expect("self.file was None.");
308                file.sync_all().await?;
309            }
310            Ok(())
311        })
312    }
313}
314
315/// Builder for [RandomAccessDisk]
316pub struct Builder {
317    filename: path::PathBuf,
318    auto_sync: bool,
319}
320
321impl Builder {
322    /// Create new builder at `path` (with auto-sync true by default).
323    pub fn new(filename: impl AsRef<path::Path>) -> Self {
324        Self {
325            filename: filename.as_ref().into(),
326            auto_sync: true,
327        }
328    }
329
330    /// Set auto-sync.
331    // NB: tokio cannot flush on drop (no AsyncDrop yet), so disabling auto_sync
332    // means you must call sync_all() explicitly before the value is dropped.
333    pub fn auto_sync(mut self, auto_sync: bool) -> Self {
334        self.auto_sync = auto_sync;
335        self
336    }
337
338    /// Build a [RandomAccessDisk] instance
339    pub async fn build(self) -> Result<RandomAccessDisk, RandomAccessError> {
340        if let Some(dirname) = self.filename.parent() {
341            mkdirp::mkdirp(dirname)?;
342        }
343        let mut file = OpenOptions::new()
344            .create(true)
345            .truncate(false)
346            .read(true)
347            .write(true)
348            .open(&self.filename)
349            .await?;
350        file.sync_all().await?;
351
352        set_sparse(&mut file).await?;
353
354        let (length, block_size) = get_length_and_block_size(&file).await?;
355        let length_arc = Arc::new(AtomicU64::new(length));
356        Ok(RandomAccessDisk {
357            filename: self.filename,
358            inner: Arc::new(Mutex::new(DiskInner {
359                file: Some(file),
360                length,
361                auto_sync: self.auto_sync,
362                block_size,
363            })),
364            length: length_arc,
365        })
366    }
367}