write_journal/
lib.rs

1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use seekable_async_file::SeekableAsyncFile;
8use seekable_async_file::WriteRequest;
9use signal_future::SignalFuture;
10use signal_future::SignalFutureController;
11use std::iter::once;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::time::sleep;
16use tracing::info;
17use tracing::warn;
18
19const OFFSETOF_HASH: u64 = 0;
20const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
21const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
22
23// Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
24pub struct TransactionWrite {
25  pub offset: u64,
26  pub data: Vec<u8>,
27  pub is_overlay: bool,
28}
29
30pub struct Transaction {
31  serial_no: u64,
32  writes: Vec<TransactionWrite>,
33  overlay: Arc<DashMap<u64, OverlayEntry>>,
34}
35
36impl Transaction {
37  fn serialised_byte_len(&self) -> u64 {
38    u64::try_from(
39      self
40        .writes
41        .iter()
42        .map(|w| 8 + 4 + w.data.len())
43        .sum::<usize>(),
44    )
45    .unwrap()
46  }
47
48  // Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
49  pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
50    Self {
51      serial_no,
52      writes: Vec::new(),
53      overlay,
54    }
55  }
56
57  pub fn serial_no(&self) -> u64 {
58    self.serial_no
59  }
60
61  pub fn into_writes(self) -> Vec<TransactionWrite> {
62    self.writes
63  }
64
65  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
66  pub fn write(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
67    self.writes.push(TransactionWrite {
68      offset,
69      data,
70      is_overlay: false,
71    });
72    self
73  }
74
75  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
76  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
77  pub fn write_with_overlay(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
78    self.overlay.insert(offset, OverlayEntry {
79      data: data.clone(),
80      serial_no: self.serial_no,
81    });
82    self.writes.push(TransactionWrite {
83      offset,
84      data,
85      is_overlay: true,
86    });
87    self
88  }
89}
90
91// We cannot evict an overlay entry after a commit loop iteration if the data at the offset has since been updated again using the overlay while the commit loop was happening. This is why we need to track `serial_no`. This mechanism requires slower one-by-one deletes by the commit loop, but allows much faster parallel overlay reads with a DashMap. The alternative would be a RwLock over two maps, one for each generation, swapping them around after each loop iteration.
92// Note that it's not necessary to ever evict for correctness (assuming the overlay is used correctly); eviction is done to avoid memory leaking.
93// Public so `OverlayEntry` can be used elsewhere (not just WriteJournal) e.g. mock journals.
94pub struct OverlayEntry {
95  pub data: Vec<u8>,
96  pub serial_no: u64,
97}
98
99pub struct WriteJournal {
100  device: SeekableAsyncFile,
101  offset: u64,
102  capacity: u64,
103  pending: DashMap<u64, (Transaction, SignalFutureController)>,
104  next_txn_serial_no: AtomicU64,
105  commit_delay: Duration,
106  overlay: Arc<DashMap<u64, OverlayEntry>>,
107}
108
109impl WriteJournal {
110  pub fn new(
111    device: SeekableAsyncFile,
112    offset: u64,
113    capacity: u64,
114    commit_delay: Duration,
115  ) -> Self {
116    assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
117    Self {
118      device,
119      offset,
120      capacity,
121      pending: Default::default(),
122      next_txn_serial_no: AtomicU64::new(0),
123      commit_delay,
124      overlay: Default::default(),
125    }
126  }
127
128  pub fn generate_blank_state(&self) -> Vec<u8> {
129    let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
130    raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
131    let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
132    raw.write_at(OFFSETOF_HASH, hash.as_bytes());
133    raw
134  }
135
136  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
137  pub async fn format_device(&self) {
138    self
139      .device
140      .write_at(self.offset, self.generate_blank_state())
141      .await;
142  }
143
144  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
145  pub async fn recover(&self) {
146    let mut raw = self
147      .device
148      .read_at(self.offset, OFFSETOF_ENTRIES)
149      .await
150      .to_vec();
151    let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
152    if len > self.capacity - OFFSETOF_ENTRIES {
153      warn!("journal is corrupt, has invalid length, skipping recovery");
154      return;
155    };
156    raw.extend_from_slice(
157      &mut self
158        .device
159        .read_at(self.offset + OFFSETOF_ENTRIES, len)
160        .await,
161    );
162    let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
163    let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
164    if expected_hash.as_bytes() != recorded_hash {
165      warn!("journal is corrupt, has invalid hash, skipping recovery");
166      return;
167    };
168    if len == 0 {
169      info!("journal is empty, no recovery necessary");
170      return;
171    };
172    let mut recovered_bytes_total = 0;
173    let mut journal_offset = OFFSETOF_ENTRIES;
174    while journal_offset < len {
175      let offset = raw.read_u64_be_at(journal_offset);
176      journal_offset += 8;
177      let data_len = raw.read_u32_be_at(journal_offset);
178      journal_offset += 4;
179      let data = raw.read_at(journal_offset, data_len.into()).to_vec();
180      journal_offset += u64::from(data_len);
181      self.device.write_at(offset, data).await;
182      recovered_bytes_total += data_len;
183    }
184    self.device.sync_data().await;
185
186    // WARNING: Make sure to sync writes BEFORE erasing journal.
187    self
188      .device
189      .write_at(self.offset, self.generate_blank_state())
190      .await;
191    self.device.sync_data().await;
192    info!(
193      recovered_entries = len,
194      recovered_bytes = recovered_bytes_total,
195      "journal has been recovered"
196    );
197  }
198
199  pub fn begin_transaction(&self) -> Transaction {
200    let serial_no = self
201      .next_txn_serial_no
202      .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
203    Transaction::new(serial_no, self.overlay.clone())
204  }
205
206  pub async fn commit_transaction(&self, txn: Transaction) {
207    let (fut, fut_ctl) = SignalFuture::new();
208    let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
209      unreachable!();
210    };
211    fut.await;
212  }
213
214  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
215  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
216  pub async fn read_with_overlay(&self, offset: u64, len: u64) -> Vec<u8> {
217    if let Some(e) = self.overlay.get(&offset) {
218      let overlay_len = e.value().data.len();
219      assert_eq!(
220        overlay_len,
221        usz!(len),
222        "overlay data at {offset} has length {overlay_len} but requested length {len}"
223      );
224      e.value().data.clone()
225    } else {
226      self.device.read_at(offset, len).await
227    }
228  }
229
230  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
231  /// WARNING: It's unlikely you want to to use this function, as it will cause cache coherency problems as this only removes the overlay entry, so stale device data will be read instead. You most likely want to write blank/default data instead. However, this is available if you know what you're doing and have a need.
232  pub async fn clear_from_overlay(&self, offset: u64) {
233    self.overlay.remove(&offset);
234  }
235
236  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
237  pub async fn start_commit_background_loop(&self) {
238    let mut next_serial = 0;
239
240    // These are outside and cleared after each iteration to avoid reallocations.
241    let mut writes = Vec::new();
242    let mut fut_ctls = Vec::new();
243    let mut overlays_to_delete = Vec::new();
244    loop {
245      sleep(self.commit_delay).await;
246
247      let mut len = 0;
248      let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
249      // We must `remove` to take ownership of the write data and avoid copying. But this means we need to reinsert into the map if we cannot process a transaction in this iteration.
250      while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
251        let entry_len = txn.serialised_byte_len();
252        if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
253          // Out of space, wait until next iteration.
254          // TODO THIS MUST PANIC IF THE FIRST, OTHERWISE WE'LL LOOP FOREVER.
255          let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
256            unreachable!();
257          };
258          break;
259        };
260        next_serial += 1;
261        for w in txn.writes {
262          let data_len: u32 = w.data.len().try_into().unwrap();
263          raw.extend_from_slice(&w.offset.to_be_bytes());
264          raw.extend_from_slice(&data_len.to_be_bytes());
265          raw.extend_from_slice(&w.data);
266          writes.push(WriteRequest::new(w.offset, w.data));
267          if w.is_overlay {
268            overlays_to_delete.push((w.offset, serial_no));
269          };
270        }
271        len += entry_len;
272        fut_ctls.push(fut_ctl);
273      }
274      if fut_ctls.is_empty() {
275        // Assert these are empty as each iteration expects to start with cleared reused Vec values.
276        assert!(overlays_to_delete.is_empty());
277        assert!(writes.is_empty());
278        continue;
279      };
280      raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
281      let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
282      raw.write_at(OFFSETOF_HASH, hash.as_bytes());
283      self
284        .device
285        .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
286        .await;
287
288      self
289        .device
290        .write_at_with_delayed_sync(writes.drain(..))
291        .await;
292
293      for fut_ctl in fut_ctls.drain(..) {
294        fut_ctl.signal(());
295      }
296
297      for (offset, serial_no) in overlays_to_delete.drain(..) {
298        self
299          .overlay
300          .remove_if(&offset, |_, e| e.serial_no <= serial_no);
301      }
302
303      // We cannot write_at_with_delayed_sync, as we may write to the journal again by then and have a conflict due to reordering.
304      self
305        .device
306        .write_at(self.offset, self.generate_blank_state())
307        .await;
308      self.device.sync_data().await;
309    }
310  }
311}