write_journal/
lib.rs

1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use seekable_async_file::SeekableAsyncFile;
8use seekable_async_file::WriteRequest;
9use signal_future::SignalFuture;
10use signal_future::SignalFutureController;
11use std::iter::once;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15use tinybuf::TinyBuf;
16use tokio::time::sleep;
17use tracing::info;
18use tracing::warn;
19
20const OFFSETOF_HASH: u64 = 0;
21const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
22const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
23
24struct TransactionWrite {
25  offset: u64,
26  data: TinyBuf,
27  is_overlay: bool,
28}
29
30pub struct Transaction {
31  serial_no: u64,
32  writes: Vec<TransactionWrite>,
33  overlay: Arc<DashMap<u64, OverlayEntry>>,
34}
35
36impl Transaction {
37  fn serialised_byte_len(&self) -> u64 {
38    u64::try_from(
39      self
40        .writes
41        .iter()
42        .map(|w| 8 + 4 + w.data.len())
43        .sum::<usize>(),
44    )
45    .unwrap()
46  }
47
48  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
49  pub fn write<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
50    let data = data.into();
51    self.writes.push(TransactionWrite {
52      offset,
53      data,
54      is_overlay: false,
55    });
56    self
57  }
58
59  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
60  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
61  pub fn write_with_overlay<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
62    let data = data.into();
63    self.overlay.insert(offset, OverlayEntry {
64      data: data.clone(),
65      serial_no: self.serial_no,
66    });
67    self.writes.push(TransactionWrite {
68      offset,
69      data,
70      is_overlay: true,
71    });
72    self
73  }
74}
75
76// We cannot evict an overlay entry after a commit loop iteration if the data at the offset has since been updated again using the overlay while the commit loop was happening. This is why we need to track `serial_no`. This mechanism requires slower one-by-one deletes by the commit loop, but allows much faster parallel overlay reads with a DashMap. The alternative would be a RwLock over two maps, one for each generation, swapping them around after each loop iteration.
77// Note that it's not necessary to ever evict for correctness (assuming the overlay is used correctly); eviction is done to avoid memory leaking.
78struct OverlayEntry {
79  data: TinyBuf,
80  serial_no: u64,
81}
82
83pub struct WriteJournal {
84  device: SeekableAsyncFile,
85  offset: u64,
86  capacity: u64,
87  pending: DashMap<u64, (Transaction, SignalFutureController)>,
88  next_txn_serial_no: AtomicU64,
89  commit_delay: Duration,
90  overlay: Arc<DashMap<u64, OverlayEntry>>,
91}
92
93impl WriteJournal {
94  pub fn new(
95    device: SeekableAsyncFile,
96    offset: u64,
97    capacity: u64,
98    commit_delay: Duration,
99  ) -> Self {
100    assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
101    Self {
102      device,
103      offset,
104      capacity,
105      pending: Default::default(),
106      next_txn_serial_no: AtomicU64::new(0),
107      commit_delay,
108      overlay: Default::default(),
109    }
110  }
111
112  pub fn generate_blank_state(&self) -> Vec<u8> {
113    let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
114    raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
115    let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
116    raw.write_at(OFFSETOF_HASH, hash.as_bytes());
117    raw
118  }
119
120  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
121  pub async fn format_device(&self) {
122    self
123      .device
124      .write_at(self.offset, self.generate_blank_state())
125      .await;
126  }
127
128  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
129  pub async fn recover(&self) {
130    let mut raw = self
131      .device
132      .read_at(self.offset, OFFSETOF_ENTRIES)
133      .await
134      .to_vec();
135    let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
136    if len > self.capacity - OFFSETOF_ENTRIES {
137      warn!("journal is corrupt, has invalid length, skipping recovery");
138      return;
139    };
140    raw.extend_from_slice(
141      &mut self
142        .device
143        .read_at(self.offset + OFFSETOF_ENTRIES, len)
144        .await,
145    );
146    let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
147    let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
148    if expected_hash.as_bytes() != recorded_hash {
149      warn!("journal is corrupt, has invalid hash, skipping recovery");
150      return;
151    };
152    if len == 0 {
153      info!("journal is empty, no recovery necessary");
154      return;
155    };
156    let mut recovered_bytes_total = 0;
157    let mut journal_offset = OFFSETOF_ENTRIES;
158    while journal_offset < len {
159      let offset = raw.read_u64_be_at(journal_offset);
160      journal_offset += 8;
161      let data_len = raw.read_u32_be_at(journal_offset);
162      journal_offset += 4;
163      let data = TinyBuf::from_slice(raw.read_at(journal_offset, data_len.into()));
164      journal_offset += u64::from(data_len);
165      self.device.write_at(offset, data).await;
166      recovered_bytes_total += data_len;
167    }
168    self.device.sync_data().await;
169
170    // WARNING: Make sure to sync writes BEFORE erasing journal.
171    self
172      .device
173      .write_at(self.offset, self.generate_blank_state())
174      .await;
175    self.device.sync_data().await;
176    info!(
177      recovered_entries = len,
178      recovered_bytes = recovered_bytes_total,
179      "journal has been recovered"
180    );
181  }
182
183  pub fn begin_transaction(&self) -> Transaction {
184    let serial_no = self
185      .next_txn_serial_no
186      .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
187    Transaction {
188      serial_no,
189      writes: Vec::new(),
190      overlay: self.overlay.clone(),
191    }
192  }
193
194  pub async fn commit_transaction(&self, txn: Transaction) {
195    let (fut, fut_ctl) = SignalFuture::new();
196    let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
197      unreachable!();
198    };
199    fut.await;
200  }
201
202  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
203  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
204  pub async fn read_with_overlay(&self, offset: u64, len: u64) -> TinyBuf {
205    if let Some(e) = self.overlay.get(&offset) {
206      let overlay_len = e.value().data.len();
207      assert_eq!(
208        overlay_len,
209        usz!(len),
210        "overlay data at {offset} has length {overlay_len} but requested length {len}"
211      );
212      e.value().data.clone()
213    } else {
214      self.device.read_at(offset, len).await.into()
215    }
216  }
217
218  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
219  /// WARNING: It's unlikely you want to to use this function, as it will cause cache coherency problems as this only removes the overlay entry, so stale device data will be read instead. You most likely want to write blank/default data instead. However, this is available if you know what you're doing and have a need.
220  pub async fn clear_from_overlay(&self, offset: u64) {
221    self.overlay.remove(&offset);
222  }
223
224  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
225  pub async fn start_commit_background_loop(&self) {
226    let mut next_serial = 0;
227
228    // These are outside and cleared after each iteration to avoid reallocations.
229    let mut writes = Vec::new();
230    let mut fut_ctls = Vec::new();
231    let mut overlays_to_delete = Vec::new();
232    loop {
233      sleep(self.commit_delay).await;
234
235      let mut len = 0;
236      let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
237      // We must `remove` to take ownership of the write data and avoid copying. But this means we need to reinsert into the map if we cannot process a transaction in this iteration.
238      while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
239        let entry_len = txn.serialised_byte_len();
240        if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
241          // Out of space, wait until next iteration.
242          // TODO THIS MUST PANIC IF THE FIRST, OTHERWISE WE'LL LOOP FOREVER.
243          let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
244            unreachable!();
245          };
246          break;
247        };
248        next_serial += 1;
249        for w in txn.writes {
250          let data_len: u32 = w.data.len().try_into().unwrap();
251          raw.extend_from_slice(&w.offset.to_be_bytes());
252          raw.extend_from_slice(&data_len.to_be_bytes());
253          raw.extend_from_slice(&w.data);
254          writes.push(WriteRequest::new(w.offset, w.data));
255          if w.is_overlay {
256            overlays_to_delete.push((w.offset, serial_no));
257          };
258        }
259        len += entry_len;
260        fut_ctls.push(fut_ctl);
261      }
262      if fut_ctls.is_empty() {
263        // Assert these are empty as each iteration expects to start with cleared reused Vec values.
264        assert!(overlays_to_delete.is_empty());
265        assert!(writes.is_empty());
266        continue;
267      };
268      raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
269      let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
270      raw.write_at(OFFSETOF_HASH, hash.as_bytes());
271      self
272        .device
273        .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
274        .await;
275
276      self
277        .device
278        .write_at_with_delayed_sync(writes.drain(..))
279        .await;
280
281      for fut_ctl in fut_ctls.drain(..) {
282        fut_ctl.signal(());
283      }
284
285      for (offset, serial_no) in overlays_to_delete.drain(..) {
286        self
287          .overlay
288          .remove_if(&offset, |_, e| e.serial_no <= serial_no);
289      }
290
291      // We cannot write_at_with_delayed_sync, as we may write to the journal again by then and have a conflict due to reordering.
292      self
293        .device
294        .write_at(self.offset, self.generate_blank_state())
295        .await;
296      self.device.sync_data().await;
297    }
298  }
299}