random_access_disk/lib.rs
1#![forbid(missing_docs)]
2#![cfg_attr(test, deny(warnings))]
3#![doc(test(attr(deny(warnings))))]
4
5//! # Continuously read and write to disk, using random offsets and lengths
6//! [RandomAccessDisk] is a complete implementation of [random-access-storage](https://docs.rs/random-access-storage)
7//! for in-memory storage.
8//!
9//! See also [random-access-memory](https://docs.rs/random-access-memory) for in-memory storage
10//! that can be swapped with this.
11//!
12//! ## Features
13//!
14//! ### `sparse` (default)
15//!
16//! Deleting may create sparse files, on by default. Creation of sparse files is tested on OSX, linux and Windows.
17//!
18//! **NB**: If this is on, `unsafe` code is used to make direct platform-specific calls!
19//!
20//! ### `async-std` (default)
21//!
22//! Use the async-std runtime, on by default. Either this or `tokio` is mandatory.
23//!
24//! ### `tokio`
25//!
26//! Use the tokio runtime. Either this or `async_std` is mandatory.
27//!
28//! ## Examples
29//!
30//! Reading, writing, deleting and truncating:
31//!
32//! ```
33//! # #[cfg(feature = "tokio")]
34//! # tokio_test::block_on(async {
35//! # example().await;
36//! # });
37//! # #[cfg(feature = "async-std")]
38//! # async_std::task::block_on(async {
39//! # example().await;
40//! # });
41//! # async fn example() {
42//! use random_access_storage::RandomAccess;
43//! use random_access_disk::RandomAccessDisk;
44//!
45//! let path = tempfile::Builder::new().prefix("basic").tempfile().unwrap().into_temp_path();
46//! let mut storage = RandomAccessDisk::open(path.to_path_buf()).await.unwrap();
47//! storage.write(0, b"hello").await.unwrap();
48//! storage.write(5, b" world").await.unwrap();
49//! assert_eq!(storage.read(0, 11).await.unwrap(), b"hello world");
50//! assert_eq!(storage.len().await.unwrap(), 11);
51//! storage.del(5, 2).await.unwrap();
52//! assert_eq!(storage.read(5, 2).await.unwrap(), [0, 0]);
53//! assert_eq!(storage.len().await.unwrap(), 11);
54//! storage.truncate(2).await.unwrap();
55//! assert_eq!(storage.len().await.unwrap(), 2);
56//! storage.truncate(5).await.unwrap();
57//! assert_eq!(storage.len().await.unwrap(), 5);
58//! assert_eq!(storage.read(0, 5).await.unwrap(), [b'h', b'e', 0, 0, 0]);
59//! # }
60//! ```
61//!
62//! In order to get benefits from the swappable interface, you will
63//! in most cases want to use generic functions for storage manipulation:
64//!
65//! ```
66//! # #[cfg(feature = "tokio")]
67//! # tokio_test::block_on(async {
68//! # example().await;
69//! # });
70//! # #[cfg(feature = "async-std")]
71//! # async_std::task::block_on(async {
72//! # example().await;
73//! # });
74//! # async fn example() {
75//! use random_access_storage::RandomAccess;
76//! use random_access_disk::RandomAccessDisk;
77//! use std::fmt::Debug;
78//!
79//! let path = tempfile::Builder::new().prefix("swappable").tempfile().unwrap().into_temp_path();
80//! let mut storage = RandomAccessDisk::open(path.to_path_buf()).await.unwrap();
81//! write_hello_world(&mut storage).await;
82//! assert_eq!(read_hello_world(&mut storage).await, b"hello world");
83//!
84//! /// Write with swappable storage
85//! async fn write_hello_world<T>(storage: &mut T)
86//! where T: RandomAccess + Debug + Send,
87//! {
88//! storage.write(0, b"hello").await.unwrap();
89//! storage.write(5, b" world").await.unwrap();
90//! }
91//!
92//! /// Read with swappable storage
93//! async fn read_hello_world<T>(storage: &mut T) -> Vec<u8>
94//! where T: RandomAccess + Debug + Send,
95//! {
96//! storage.read(0, 11).await.unwrap()
97//! }
98//! # }
99
100#[cfg(not(any(feature = "async-std", feature = "tokio")))]
101compile_error!(
102 "Either feature `random-access-disk/async-std` or `random-access-disk/tokio` must be enabled."
103);
104
105#[cfg(all(feature = "async-std", feature = "tokio"))]
106compile_error!("features `random-access-disk/async-std` and `random-access-disk/tokio` are mutually exclusive");
107
108#[cfg(feature = "async-std")]
109use async_std::{
110 fs::{self, OpenOptions},
111 io::prelude::{SeekExt, WriteExt},
112 io::{ReadExt, SeekFrom},
113};
114use random_access_storage::{RandomAccess, RandomAccessError};
115use std::ops::Drop;
116use std::path;
117
118#[cfg(feature = "tokio")]
119use std::io::SeekFrom;
120#[cfg(feature = "tokio")]
121use tokio::{
122 fs::{self, OpenOptions},
123 io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
124};
125
126#[cfg(all(
127 feature = "sparse",
128 any(
129 target_os = "linux",
130 target_os = "android",
131 target_os = "freebsd",
132 target_os = "macos",
133 )
134))]
135mod unix;
136#[cfg(all(
137 feature = "sparse",
138 any(
139 target_os = "linux",
140 target_os = "android",
141 target_os = "freebsd",
142 target_os = "macos",
143 )
144))]
145use unix::{get_length_and_block_size, set_sparse, trim};
146
147#[cfg(all(feature = "sparse", windows))]
148mod windows;
149#[cfg(all(feature = "sparse", windows))]
150use windows::{get_length_and_block_size, set_sparse, trim};
151
152#[cfg(not(all(
153 feature = "sparse",
154 any(
155 target_os = "linux",
156 target_os = "android",
157 target_os = "freebsd",
158 target_os = "macos",
159 windows,
160 )
161)))]
162mod default;
163
164#[cfg(not(all(
165 feature = "sparse",
166 any(
167 target_os = "linux",
168 target_os = "android",
169 target_os = "freebsd",
170 target_os = "macos",
171 windows,
172 )
173)))]
174use default::{get_length_and_block_size, set_sparse, trim};
175
176/// Main constructor.
177#[derive(Debug)]
178pub struct RandomAccessDisk {
179 #[allow(dead_code)]
180 filename: path::PathBuf,
181 file: Option<fs::File>,
182 length: u64,
183 block_size: u64,
184 auto_sync: bool,
185}
186
187impl RandomAccessDisk {
188 /// Create a new (auto-sync) instance to storage at `filename`.
189 #[allow(clippy::new_ret_no_self)]
190 pub async fn open(
191 filename: impl AsRef<path::Path>,
192 ) -> Result<RandomAccessDisk, RandomAccessError> {
193 Self::builder(filename).build().await
194 }
195
196 /// Initialize a builder with storage at `filename`.
197 pub fn builder(filename: impl AsRef<path::Path>) -> Builder {
198 Builder::new(filename)
199 }
200}
201
202#[async_trait::async_trait]
203impl RandomAccess for RandomAccessDisk {
204 async fn write(
205 &mut self,
206 offset: u64,
207 data: &[u8],
208 ) -> Result<(), RandomAccessError> {
209 let file = self.file.as_mut().expect("self.file was None.");
210 file.seek(SeekFrom::Start(offset)).await?;
211 file.write_all(data).await?;
212 if self.auto_sync {
213 file.sync_all().await?;
214 }
215
216 // We've changed the length of our file.
217 let new_len = offset + (data.len() as u64);
218 if new_len > self.length {
219 self.length = new_len;
220 }
221
222 Ok(())
223 }
224
225 // NOTE(yw): disabling clippy here because we files on disk might be sparse,
226 // and sometimes you might want to read a bit of memory to check if it's
227 // formatted or not. Returning zero'd out memory seems like an OK thing to do.
228 // We should probably come back to this at a future point, and determine
229 // whether it's okay to return a fully zero'd out slice. It's a bit weird,
230 // because we're replacing empty data with actual zeroes - which does not
231 // reflect the state of the world.
232 // #[cfg_attr(test, allow(unused_io_amount))]
233 async fn read(
234 &mut self,
235 offset: u64,
236 length: u64,
237 ) -> Result<Vec<u8>, RandomAccessError> {
238 if offset + length > self.length {
239 return Err(RandomAccessError::OutOfBounds {
240 offset,
241 end: Some(offset + length),
242 length: self.length,
243 });
244 }
245
246 let file = self.file.as_mut().expect("self.file was None.");
247 let mut buffer = vec![0; length as usize];
248 file.seek(SeekFrom::Start(offset)).await?;
249 let _bytes_read = file.read(&mut buffer[..]).await?;
250 Ok(buffer)
251 }
252
253 async fn del(
254 &mut self,
255 offset: u64,
256 length: u64,
257 ) -> Result<(), RandomAccessError> {
258 if offset > self.length {
259 return Err(RandomAccessError::OutOfBounds {
260 offset,
261 end: None,
262 length: self.length,
263 });
264 };
265
266 if length == 0 {
267 // No-op
268 return Ok(());
269 }
270
271 // Delete is truncate if up to the current length or more is deleted
272 if offset + length >= self.length {
273 return self.truncate(offset).await;
274 }
275
276 let file = self.file.as_mut().expect("self.file was None.");
277 trim(file, offset, length, self.block_size).await?;
278 if self.auto_sync {
279 file.sync_all().await?;
280 }
281 Ok(())
282 }
283
284 async fn truncate(&mut self, length: u64) -> Result<(), RandomAccessError> {
285 let file = self.file.as_ref().expect("self.file was None.");
286 self.length = length;
287 file.set_len(self.length).await?;
288 if self.auto_sync {
289 file.sync_all().await?;
290 }
291 Ok(())
292 }
293
294 async fn len(&mut self) -> Result<u64, RandomAccessError> {
295 Ok(self.length)
296 }
297
298 async fn is_empty(&mut self) -> Result<bool, RandomAccessError> {
299 Ok(self.length == 0)
300 }
301
302 async fn sync_all(&mut self) -> Result<(), RandomAccessError> {
303 if !self.auto_sync {
304 let file = self.file.as_ref().expect("self.file was None.");
305 file.sync_all().await?;
306 }
307 Ok(())
308 }
309}
310
311impl Drop for RandomAccessDisk {
312 fn drop(&mut self) {
313 // We need to flush the file on drop. Unfortunately, that is not possible to do in a
314 // non-blocking fashion, but our only other option here is losing data remaining in the
315 // write cache. Good task schedulers should be resilient to occasional blocking hiccups in
316 // file destructors so we don't expect this to be a common problem in practice.
317 // (from async_std::fs::File::drop)
318 #[cfg(feature = "async-std")]
319 if let Some(file) = &self.file {
320 let _ = async_std::task::block_on(file.sync_all());
321 }
322 // For tokio, the below errors with:
323 //
324 // "Cannot start a runtime from within a runtime. This happens because a function (like
325 // `block_on`) attempted to block the current thread while the thread is being used to
326 // drive asynchronous tasks."
327 //
328 // There doesn't seem to be an equivalent block_on version for tokio that actually works
329 // in a sync drop(), so for tokio, we'll need to wait for a real AsyncDrop to arrive.
330 //
331 // #[cfg(feature = "tokio")]
332 // if let Some(file) = &self.file {
333 // tokio::runtime::Handle::current()
334 // .block_on(file.sync_all())
335 // .expect("Could not sync file changes on drop.");
336 // }
337 }
338}
339
340/// Builder for [RandomAccessDisk]
341pub struct Builder {
342 filename: path::PathBuf,
343 auto_sync: bool,
344}
345
346impl Builder {
347 /// Create new builder at `path` (with auto-sync true by default).
348 pub fn new(filename: impl AsRef<path::Path>) -> Self {
349 Self {
350 filename: filename.as_ref().into(),
351 auto_sync: true,
352 }
353 }
354
355 /// Set auto-sync
356 // NB: Because of no AsyncDrop, tokio can not ensure that changes are synced when dropped,
357 // see impl Drop above.
358 #[cfg(feature = "async-std")]
359 pub fn auto_sync(mut self, auto_sync: bool) -> Self {
360 self.auto_sync = auto_sync;
361 self
362 }
363
364 /// Build a [RandomAccessDisk] instance
365 pub async fn build(self) -> Result<RandomAccessDisk, RandomAccessError> {
366 if let Some(dirname) = self.filename.parent() {
367 mkdirp::mkdirp(dirname)?;
368 }
369 let mut file = OpenOptions::new()
370 .create(true)
371 .read(true)
372 .write(true)
373 .open(&self.filename)
374 .await?;
375 file.sync_all().await?;
376
377 set_sparse(&mut file).await?;
378
379 let (length, block_size) = get_length_and_block_size(&file).await?;
380 Ok(RandomAccessDisk {
381 filename: self.filename,
382 file: Some(file),
383 length,
384 auto_sync: self.auto_sync,
385 block_size,
386 })
387 }
388}