1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
#![forbid(missing_docs)]
#![cfg_attr(test, deny(warnings))]
#![doc(test(attr(deny(warnings))))]

//! # Continuously read and write to disk, using random offsets and lengths
//! [RandomAccessDisk] is a complete implementation of [random-access-storage](https://docs.rs/random-access-storage)
//! for in-memory storage.
//!
//! See also [random-access-memory](https://docs.rs/random-access-memory) for in-memory storage
//! that can be swapped with this.
//!
//! ## Features
//!
//! ### `sparse` (default)
//!
//! Deleting may create sparse files, on by default. Creation of sparse files is tested on OSX, linux and Windows.
//!
//! **NB**: If this is on, `unsafe` code is used to make direct platform-specific calls!
//!
//! ### `async-std` (default)
//!
//! Use the async-std runtime, on by default. Either this or `tokio` is mandatory.
//!
//! ### `tokio`
//!
//! Use the tokio runtime. Either this or `async_std` is mandatory.
//!
//! ## Examples
//!
//! Reading, writing, deleting and truncating:
//!
//! ```
//! # #[cfg(feature = "tokio")]
//! # tokio_test::block_on(async {
//! # example().await;
//! # });
//! # #[cfg(feature = "async-std")]
//! # async_std::task::block_on(async {
//! # example().await;
//! # });
//! # async fn example() {
//! use random_access_storage::RandomAccess;
//! use random_access_disk::RandomAccessDisk;
//!
//! let path = tempfile::Builder::new().prefix("basic").tempfile().unwrap().into_temp_path();
//! let mut storage = RandomAccessDisk::open(path.to_path_buf()).await.unwrap();
//! storage.write(0, b"hello").await.unwrap();
//! storage.write(5, b" world").await.unwrap();
//! assert_eq!(storage.read(0, 11).await.unwrap(), b"hello world");
//! assert_eq!(storage.len().await.unwrap(), 11);
//! storage.del(5, 2).await.unwrap();
//! assert_eq!(storage.read(5, 2).await.unwrap(), [0, 0]);
//! assert_eq!(storage.len().await.unwrap(), 11);
//! storage.truncate(2).await.unwrap();
//! assert_eq!(storage.len().await.unwrap(), 2);
//! storage.truncate(5).await.unwrap();
//! assert_eq!(storage.len().await.unwrap(), 5);
//! assert_eq!(storage.read(0, 5).await.unwrap(), [b'h', b'e', 0, 0, 0]);
//! # }
//! ```
//!
//! In order to get benefits from the swappable interface, you will
//! in most cases want to use generic functions for storage manipulation:
//!
//! ```
//! # #[cfg(feature = "tokio")]
//! # tokio_test::block_on(async {
//! # example().await;
//! # });
//! # #[cfg(feature = "async-std")]
//! # async_std::task::block_on(async {
//! # example().await;
//! # });
//! # async fn example() {
//! use random_access_storage::RandomAccess;
//! use random_access_disk::RandomAccessDisk;
//! use std::fmt::Debug;
//!
//! let path = tempfile::Builder::new().prefix("swappable").tempfile().unwrap().into_temp_path();
//! let mut storage = RandomAccessDisk::open(path.to_path_buf()).await.unwrap();
//! write_hello_world(&mut storage).await;
//! assert_eq!(read_hello_world(&mut storage).await, b"hello world");
//!
//! /// Write with swappable storage
//! async fn write_hello_world<T>(storage: &mut T)
//! where T: RandomAccess + Debug + Send,
//! {
//!   storage.write(0, b"hello").await.unwrap();
//!   storage.write(5, b" world").await.unwrap();
//! }
//!
//! /// Read with swappable storage
//! async fn read_hello_world<T>(storage: &mut T) -> Vec<u8>
//! where T: RandomAccess + Debug + Send,
//! {
//!   storage.read(0, 11).await.unwrap()
//! }
//! # }

#[cfg(not(any(feature = "async-std", feature = "tokio")))]
compile_error!(
  "Either feature `random-access-disk/async-std` or `random-access-disk/tokio` must be enabled."
);

#[cfg(all(feature = "async-std", feature = "tokio"))]
compile_error!("features `random-access-disk/async-std` and `random-access-disk/tokio` are mutually exclusive");

#[cfg(feature = "async-std")]
use async_std::{
  fs::{self, OpenOptions},
  io::prelude::{SeekExt, WriteExt},
  io::{ReadExt, SeekFrom},
};
use random_access_storage::{RandomAccess, RandomAccessError};
use std::ops::Drop;
use std::path;

#[cfg(feature = "tokio")]
use std::io::SeekFrom;
#[cfg(feature = "tokio")]
use tokio::{
  fs::{self, OpenOptions},
  io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt},
};

#[cfg(all(
  feature = "sparse",
  any(
    target_os = "linux",
    target_os = "android",
    target_os = "freebsd",
    target_os = "macos",
  )
))]
mod unix;
#[cfg(all(
  feature = "sparse",
  any(
    target_os = "linux",
    target_os = "android",
    target_os = "freebsd",
    target_os = "macos",
  )
))]
use unix::{get_length_and_block_size, set_sparse, trim};

#[cfg(all(feature = "sparse", windows))]
mod windows;
#[cfg(all(feature = "sparse", windows))]
use windows::{get_length_and_block_size, set_sparse, trim};

#[cfg(not(all(
  feature = "sparse",
  any(
    target_os = "linux",
    target_os = "android",
    target_os = "freebsd",
    target_os = "macos",
    windows,
  )
)))]
mod default;

#[cfg(not(all(
  feature = "sparse",
  any(
    target_os = "linux",
    target_os = "android",
    target_os = "freebsd",
    target_os = "macos",
    windows,
  )
)))]
use default::{get_length_and_block_size, set_sparse, trim};

/// Main constructor.
#[derive(Debug)]
pub struct RandomAccessDisk {
  #[allow(dead_code)]
  filename: path::PathBuf,
  file: Option<fs::File>,
  length: u64,
  block_size: u64,
  auto_sync: bool,
}

impl RandomAccessDisk {
  /// Create a new (auto-sync) instance to storage at `filename`.
  #[allow(clippy::new_ret_no_self)]
  pub async fn open(
    filename: impl AsRef<path::Path>,
  ) -> Result<RandomAccessDisk, RandomAccessError> {
    Self::builder(filename).build().await
  }

  /// Initialize a builder with storage at `filename`.
  pub fn builder(filename: impl AsRef<path::Path>) -> Builder {
    Builder::new(filename)
  }
}

#[async_trait::async_trait]
impl RandomAccess for RandomAccessDisk {
  async fn write(
    &mut self,
    offset: u64,
    data: &[u8],
  ) -> Result<(), RandomAccessError> {
    let file = self.file.as_mut().expect("self.file was None.");
    file.seek(SeekFrom::Start(offset)).await?;
    file.write_all(data).await?;
    if self.auto_sync {
      file.sync_all().await?;
    }

    // We've changed the length of our file.
    let new_len = offset + (data.len() as u64);
    if new_len > self.length {
      self.length = new_len;
    }

    Ok(())
  }

  // NOTE(yw): disabling clippy here because we files on disk might be sparse,
  // and sometimes you might want to read a bit of memory to check if it's
  // formatted or not. Returning zero'd out memory seems like an OK thing to do.
  // We should probably come back to this at a future point, and determine
  // whether it's okay to return a fully zero'd out slice. It's a bit weird,
  // because we're replacing empty data with actual zeroes - which does not
  // reflect the state of the world.
  // #[cfg_attr(test, allow(unused_io_amount))]
  async fn read(
    &mut self,
    offset: u64,
    length: u64,
  ) -> Result<Vec<u8>, RandomAccessError> {
    if offset + length > self.length {
      return Err(RandomAccessError::OutOfBounds {
        offset,
        end: Some(offset + length),
        length: self.length,
      });
    }

    let file = self.file.as_mut().expect("self.file was None.");
    let mut buffer = vec![0; length as usize];
    file.seek(SeekFrom::Start(offset)).await?;
    let _bytes_read = file.read(&mut buffer[..]).await?;
    Ok(buffer)
  }

  async fn del(
    &mut self,
    offset: u64,
    length: u64,
  ) -> Result<(), RandomAccessError> {
    if offset > self.length {
      return Err(RandomAccessError::OutOfBounds {
        offset,
        end: None,
        length: self.length,
      });
    };

    if length == 0 {
      // No-op
      return Ok(());
    }

    // Delete is truncate if up to the current length or more is deleted
    if offset + length >= self.length {
      return self.truncate(offset).await;
    }

    let file = self.file.as_mut().expect("self.file was None.");
    trim(file, offset, length, self.block_size).await?;
    if self.auto_sync {
      file.sync_all().await?;
    }
    Ok(())
  }

  async fn truncate(&mut self, length: u64) -> Result<(), RandomAccessError> {
    let file = self.file.as_ref().expect("self.file was None.");
    self.length = length;
    file.set_len(self.length).await?;
    if self.auto_sync {
      file.sync_all().await?;
    }
    Ok(())
  }

  async fn len(&mut self) -> Result<u64, RandomAccessError> {
    Ok(self.length)
  }

  async fn is_empty(&mut self) -> Result<bool, RandomAccessError> {
    Ok(self.length == 0)
  }

  async fn sync_all(&mut self) -> Result<(), RandomAccessError> {
    if !self.auto_sync {
      let file = self.file.as_ref().expect("self.file was None.");
      file.sync_all().await?;
    }
    Ok(())
  }
}

impl Drop for RandomAccessDisk {
  fn drop(&mut self) {
    // We need to flush the file on drop. Unfortunately, that is not possible to do in a
    // non-blocking fashion, but our only other option here is losing data remaining in the
    // write cache. Good task schedulers should be resilient to occasional blocking hiccups in
    // file destructors so we don't expect this to be a common problem in practice.
    // (from async_std::fs::File::drop)
    #[cfg(feature = "async-std")]
    if let Some(file) = &self.file {
      let _ = async_std::task::block_on(file.sync_all());
    }
    // For tokio, the below errors with:
    //
    // "Cannot start a runtime from within a runtime. This happens because a function (like
    // `block_on`) attempted to block the current thread while the thread is being used to
    // drive asynchronous tasks."
    //
    // There doesn't seem to be an equivalent block_on version for tokio that actually works
    // in a sync drop(), so for tokio, we'll need to wait for a real AsyncDrop to arrive.
    //
    // #[cfg(feature = "tokio")]
    // if let Some(file) = &self.file {
    //   tokio::runtime::Handle::current()
    //     .block_on(file.sync_all())
    //     .expect("Could not sync file changes on drop.");
    // }
  }
}

/// Builder for [RandomAccessDisk]
pub struct Builder {
  filename: path::PathBuf,
  auto_sync: bool,
}

impl Builder {
  /// Create new builder at `path` (with auto-sync true by default).
  pub fn new(filename: impl AsRef<path::Path>) -> Self {
    Self {
      filename: filename.as_ref().into(),
      auto_sync: true,
    }
  }

  /// Set auto-sync
  // NB: Because of no AsyncDrop, tokio can not ensure that changes are synced when dropped,
  // see impl Drop above.
  #[cfg(feature = "async-std")]
  pub fn auto_sync(mut self, auto_sync: bool) -> Self {
    self.auto_sync = auto_sync;
    self
  }

  /// Build a [RandomAccessDisk] instance
  pub async fn build(self) -> Result<RandomAccessDisk, RandomAccessError> {
    if let Some(dirname) = self.filename.parent() {
      mkdirp::mkdirp(dirname)?;
    }
    let mut file = OpenOptions::new()
      .create(true)
      .read(true)
      .write(true)
      .open(&self.filename)
      .await?;
    file.sync_all().await?;

    set_sparse(&mut file).await?;

    let (length, block_size) = get_length_and_block_size(&file).await?;
    Ok(RandomAccessDisk {
      filename: self.filename,
      file: Some(file),
      length,
      auto_sync: self.auto_sync,
      block_size,
    })
  }
}