1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
pub mod file;
#[cfg(target_os = "linux")]
pub mod uring;

use async_trait::async_trait;
use bufpool::buf::Buf;
use off64::u64;
use std::sync::Arc;

/*

Why use three different types instead of simply one?
- Offsets: lots of subtleties due to different levels of offsets, with different interpretations by different components. For example, the journal always writes in offsets relative to the partition, but if there's only one type, then a bounded type only knows about itself and the root (i.e. device), giving incorrect offsets.
- Names: similar to the previous point, the type name gets quite confusing, as some are used partition level while others are bounded to a specific range, but they all have the same type and API.

Using a generic trait for BackingStore allows us to provide different variants for testing, in-memory, different platforms, user configurability, etc.

*/

#[async_trait]
pub(crate) trait BackingStore: Send + Sync {
  /// `offset` and `len` must be multiples of the underlying device's sector size.
  async fn read_at(&self, offset: u64, len: u64) -> Buf;

  /// `offset` and `data.len()` must be multiples of the underlying device's sector size.
  /// Returns the original `data` so that it can be reused, if desired.
  async fn write_at(&self, offset: u64, data: Buf) -> Buf;

  /// Even when using direct I/O, `fsync` is still necessary, as it ensures the device itself has flushed any internal caches.
  async fn sync(&self);
}

#[derive(Clone)]
pub(crate) struct PartitionStore {
  backing_store: Arc<dyn BackingStore>,
  offset: u64,
  len: u64,
}

impl PartitionStore {
  pub fn new(backing_store: Arc<dyn BackingStore>, offset: u64, len: u64) -> Self {
    Self {
      backing_store,
      offset,
      len,
    }
  }

  pub fn offset(&self) -> u64 {
    self.offset
  }

  pub fn len(&self) -> u64 {
    self.len
  }

  pub async fn read_at(&self, offset: u64, len: u64) -> Buf {
    assert!(
      offset + len <= self.len,
      "attempted to read at {} with length {} but partition has length {}",
      offset,
      len,
      self.len
    );
    self.backing_store.read_at(self.offset + offset, len).await
  }

  pub async fn write_at(&self, offset: u64, data: Buf) {
    let len = u64!(data.len());
    assert!(
      offset + len <= self.len,
      "attempted to write at {} with length {} but partition has length {}",
      offset,
      len,
      self.len
    );
    self
      .backing_store
      .write_at(self.offset + offset, data)
      .await;
  }

  pub async fn sync(&self) {
    self.backing_store.sync().await;
  }

  /// `offset` must be a multiple of the underlying device's sector size.
  pub fn bounded(&self, offset: u64, len: u64) -> BoundedStore {
    assert!(offset + len <= self.len);
    BoundedStore {
      partition_store: self.clone(),
      offset,
      len,
    }
  }
}

#[derive(Clone)]
pub(crate) struct BoundedStore {
  partition_store: PartitionStore,
  offset: u64,
  len: u64,
}

#[allow(unused)]
impl BoundedStore {
  pub fn len(&self) -> u64 {
    self.len
  }

  pub async fn read_at(&self, offset: u64, len: u64) -> Buf {
    assert!(offset + len <= self.len);
    self
      .partition_store
      .read_at(self.offset + offset, len)
      .await
  }

  pub async fn write_at(&self, offset: u64, data: Buf) {
    assert!(offset + u64!(data.len()) <= self.len);
    self
      .partition_store
      .write_at(self.offset + offset, data)
      .await;
  }
}