write_journal/
lib.rs

1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use seekable_async_file::SeekableAsyncFile;
8use seekable_async_file::WriteRequest;
9use signal_future::SignalFuture;
10use signal_future::SignalFutureController;
11use std::iter::once;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::time::sleep;
16use tracing::info;
17use tracing::warn;
18
19const OFFSETOF_HASH: u64 = 0;
20const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
21const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
22
23// Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
24pub struct TransactionWrite {
25  pub offset: u64,
26  pub data: Vec<u8>,
27  pub is_overlay: bool,
28}
29
30pub struct Transaction {
31  serial_no: u64,
32  writes: Vec<TransactionWrite>,
33  overlay: Arc<DashMap<u64, OverlayEntry>>,
34}
35
36impl Transaction {
37  fn serialised_byte_len(&self) -> u64 {
38    u64::try_from(
39      self
40        .writes
41        .iter()
42        .map(|w| 8 + 4 + w.data.len())
43        .sum::<usize>(),
44    )
45    .unwrap()
46  }
47
48  // Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
49  pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
50    Self {
51      serial_no,
52      writes: Vec::new(),
53      overlay,
54    }
55  }
56
57  pub fn serial_no(&self) -> u64 {
58    self.serial_no
59  }
60
61  pub fn into_writes(self) -> Vec<TransactionWrite> {
62    self.writes
63  }
64
65  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
66  pub fn write(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
67    self.writes.push(TransactionWrite {
68      offset,
69      data,
70      is_overlay: false,
71    });
72    self
73  }
74
75  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
76  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
77  pub fn write_with_overlay(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
78    self.overlay.insert(offset, OverlayEntry {
79      data: data.clone(),
80      serial_no: self.serial_no,
81    });
82    self.writes.push(TransactionWrite {
83      offset,
84      data,
85      is_overlay: true,
86    });
87    self
88  }
89}
90
91// We cannot evict an overlay entry after a commit loop iteration if the data at the offset has since been updated again using the overlay while the commit loop was happening. This is why we need to track `serial_no`. This mechanism requires slower one-by-one deletes by the commit loop, but allows much faster parallel overlay reads with a DashMap. The alternative would be a RwLock over two maps, one for each generation, swapping them around after each loop iteration.
92// Note that it's not necessary to ever evict for correctness (assuming the overlay is used correctly); eviction is done to avoid memory leaking.
93// Public so `OverlayEntry` can be used elsewhere (not just WriteJournal) e.g. mock journals.
94pub struct OverlayEntry {
95  pub data: Vec<u8>,
96  pub serial_no: u64,
97}
98
99pub struct WriteJournal {
100  device: SeekableAsyncFile,
101  offset: u64,
102  capacity: u64,
103  pending: DashMap<u64, (Transaction, SignalFutureController)>,
104  next_txn_serial_no: AtomicU64,
105  commit_delay: Duration,
106  overlay: Arc<DashMap<u64, OverlayEntry>>,
107}
108
109impl WriteJournal {
110  pub fn new(
111    device: SeekableAsyncFile,
112    offset: u64,
113    capacity: u64,
114    commit_delay: Duration,
115  ) -> Self {
116    assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
117    Self {
118      device,
119      offset,
120      capacity,
121      pending: Default::default(),
122      next_txn_serial_no: AtomicU64::new(0),
123      commit_delay,
124      overlay: Default::default(),
125    }
126  }
127
128  pub fn generate_blank_state(&self) -> Vec<u8> {
129    let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
130    raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
131    let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
132    raw.write_at(OFFSETOF_HASH, hash.as_bytes());
133    raw
134  }
135
136  pub async fn format_device(&self) {
137    self
138      .device
139      .write_at(self.offset, self.generate_blank_state())
140      .await;
141  }
142
143  pub async fn recover(&self) {
144    let mut raw = self
145      .device
146      .read_at(self.offset, OFFSETOF_ENTRIES)
147      .await
148      .to_vec();
149    let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
150    if len > self.capacity - OFFSETOF_ENTRIES {
151      warn!("journal is corrupt, has invalid length, skipping recovery");
152      return;
153    };
154    raw.extend_from_slice(
155      &mut self
156        .device
157        .read_at(self.offset + OFFSETOF_ENTRIES, len)
158        .await,
159    );
160    let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
161    let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
162    if expected_hash.as_bytes() != recorded_hash {
163      warn!("journal is corrupt, has invalid hash, skipping recovery");
164      return;
165    };
166    if len == 0 {
167      info!("journal is empty, no recovery necessary");
168      return;
169    };
170    let mut recovered_bytes_total = 0;
171    let mut journal_offset = OFFSETOF_ENTRIES;
172    while journal_offset < len {
173      let offset = raw.read_u64_be_at(journal_offset);
174      journal_offset += 8;
175      let data_len = raw.read_u32_be_at(journal_offset);
176      journal_offset += 4;
177      let data = raw.read_at(journal_offset, data_len.into()).to_vec();
178      journal_offset += u64::from(data_len);
179      self.device.write_at(offset, data).await;
180      recovered_bytes_total += data_len;
181    }
182    self.device.sync_data().await;
183
184    // WARNING: Make sure to sync writes BEFORE erasing journal.
185    self
186      .device
187      .write_at(self.offset, self.generate_blank_state())
188      .await;
189    self.device.sync_data().await;
190    info!(
191      recovered_entries = len,
192      recovered_bytes = recovered_bytes_total,
193      "journal has been recovered"
194    );
195  }
196
197  pub fn begin_transaction(&self) -> Transaction {
198    let serial_no = self
199      .next_txn_serial_no
200      .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
201    Transaction::new(serial_no, self.overlay.clone())
202  }
203
204  pub async fn commit_transaction(&self, txn: Transaction) {
205    let (fut, fut_ctl) = SignalFuture::new();
206    let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
207      unreachable!();
208    };
209    fut.await;
210  }
211
212  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
213  pub async fn read_with_overlay(&self, offset: u64, len: u64) -> Vec<u8> {
214    if let Some(e) = self.overlay.get(&offset) {
215      let overlay_len = e.value().data.len();
216      assert_eq!(
217        overlay_len,
218        usz!(len),
219        "overlay data at {offset} has length {overlay_len} but requested length {len}"
220      );
221      e.value().data.clone()
222    } else {
223      self.device.read_at(offset, len).await
224    }
225  }
226
227  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
228  /// WARNING: It's unlikely you want to to use this function, as it will cause cache coherency problems as this only removes the overlay entry, so stale device data will be read instead. You most likely want to write blank/default data instead. However, this is available if you know what you're doing and have a need.
229  pub async fn clear_from_overlay(&self, offset: u64) {
230    self.overlay.remove(&offset);
231  }
232
233  pub async fn start_commit_background_loop(&self) {
234    let mut next_serial = 0;
235
236    // These are outside and cleared after each iteration to avoid reallocations.
237    let mut writes = Vec::new();
238    let mut fut_ctls = Vec::new();
239    let mut overlays_to_delete = Vec::new();
240    loop {
241      sleep(self.commit_delay).await;
242
243      let mut len = 0;
244      let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
245      // We must `remove` to take ownership of the write data and avoid copying. But this means we need to reinsert into the map if we cannot process a transaction in this iteration.
246      while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
247        let entry_len = txn.serialised_byte_len();
248        if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
249          // Out of space, wait until next iteration.
250          // TODO THIS MUST PANIC IF THE FIRST, OTHERWISE WE'LL LOOP FOREVER.
251          let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
252            unreachable!();
253          };
254          break;
255        };
256        next_serial += 1;
257        for w in txn.writes {
258          let data_len: u32 = w.data.len().try_into().unwrap();
259          raw.extend_from_slice(&w.offset.to_be_bytes());
260          raw.extend_from_slice(&data_len.to_be_bytes());
261          raw.extend_from_slice(&w.data);
262          writes.push(WriteRequest::new(w.offset, w.data));
263          if w.is_overlay {
264            overlays_to_delete.push((w.offset, serial_no));
265          };
266        }
267        len += entry_len;
268        fut_ctls.push(fut_ctl);
269      }
270      if fut_ctls.is_empty() {
271        // Assert these are empty as each iteration expects to start with cleared reused Vec values.
272        assert!(overlays_to_delete.is_empty());
273        assert!(writes.is_empty());
274        continue;
275      };
276      raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
277      let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
278      raw.write_at(OFFSETOF_HASH, hash.as_bytes());
279      self
280        .device
281        .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
282        .await;
283
284      self
285        .device
286        .write_at_with_delayed_sync(writes.drain(..))
287        .await;
288
289      for fut_ctl in fut_ctls.drain(..) {
290        fut_ctl.signal(());
291      }
292
293      for (offset, serial_no) in overlays_to_delete.drain(..) {
294        self
295          .overlay
296          .remove_if(&offset, |_, e| e.serial_no <= serial_no);
297      }
298
299      // We cannot write_at_with_delayed_sync, as we may write to the journal again by then and have a conflict due to reordering.
300      self
301        .device
302        .write_at(self.offset, self.generate_blank_state())
303        .await;
304      self.device.sync_data().await;
305    }
306  }
307}