write_journal/
lib.rs

1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use seekable_async_file::SeekableAsyncFile;
8use seekable_async_file::WriteRequest;
9use signal_future::SignalFuture;
10use signal_future::SignalFutureController;
11use std::iter::once;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15use tinybuf::TinyBuf;
16use tokio::time::sleep;
17use tracing::info;
18use tracing::warn;
19
20const OFFSETOF_HASH: u64 = 0;
21const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
22const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
23
24// Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
25pub struct TransactionWrite {
26  pub offset: u64,
27  pub data: TinyBuf,
28  pub is_overlay: bool,
29}
30
31pub struct Transaction {
32  serial_no: u64,
33  writes: Vec<TransactionWrite>,
34  overlay: Arc<DashMap<u64, OverlayEntry>>,
35}
36
37impl Transaction {
38  fn serialised_byte_len(&self) -> u64 {
39    u64::try_from(
40      self
41        .writes
42        .iter()
43        .map(|w| 8 + 4 + w.data.len())
44        .sum::<usize>(),
45    )
46    .unwrap()
47  }
48
49  // Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
50  pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
51    Self {
52      serial_no,
53      writes: Vec::new(),
54      overlay,
55    }
56  }
57
58  pub fn serial_no(&self) -> u64 {
59    self.serial_no
60  }
61
62  pub fn into_writes(self) -> Vec<TransactionWrite> {
63    self.writes
64  }
65
66  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
67  pub fn write<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
68    let data = data.into();
69    self.writes.push(TransactionWrite {
70      offset,
71      data,
72      is_overlay: false,
73    });
74    self
75  }
76
77  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
78  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
79  pub fn write_with_overlay<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
80    let data = data.into();
81    self.overlay.insert(offset, OverlayEntry {
82      data: data.clone(),
83      serial_no: self.serial_no,
84    });
85    self.writes.push(TransactionWrite {
86      offset,
87      data,
88      is_overlay: true,
89    });
90    self
91  }
92}
93
94// We cannot evict an overlay entry after a commit loop iteration if the data at the offset has since been updated again using the overlay while the commit loop was happening. This is why we need to track `serial_no`. This mechanism requires slower one-by-one deletes by the commit loop, but allows much faster parallel overlay reads with a DashMap. The alternative would be a RwLock over two maps, one for each generation, swapping them around after each loop iteration.
95// Note that it's not necessary to ever evict for correctness (assuming the overlay is used correctly); eviction is done to avoid memory leaking.
96// Public so `OverlayEntry` can be used elsewhere (not just WriteJournal) e.g. mock journals.
97pub struct OverlayEntry {
98  pub data: TinyBuf,
99  pub serial_no: u64,
100}
101
102pub struct WriteJournal {
103  device: SeekableAsyncFile,
104  offset: u64,
105  capacity: u64,
106  pending: DashMap<u64, (Transaction, SignalFutureController)>,
107  next_txn_serial_no: AtomicU64,
108  commit_delay: Duration,
109  overlay: Arc<DashMap<u64, OverlayEntry>>,
110}
111
112impl WriteJournal {
113  pub fn new(
114    device: SeekableAsyncFile,
115    offset: u64,
116    capacity: u64,
117    commit_delay: Duration,
118  ) -> Self {
119    assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
120    Self {
121      device,
122      offset,
123      capacity,
124      pending: Default::default(),
125      next_txn_serial_no: AtomicU64::new(0),
126      commit_delay,
127      overlay: Default::default(),
128    }
129  }
130
131  pub fn generate_blank_state(&self) -> Vec<u8> {
132    let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
133    raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
134    let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
135    raw.write_at(OFFSETOF_HASH, hash.as_bytes());
136    raw
137  }
138
139  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
140  pub async fn format_device(&self) {
141    self
142      .device
143      .write_at(self.offset, self.generate_blank_state())
144      .await;
145  }
146
147  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
148  pub async fn recover(&self) {
149    let mut raw = self
150      .device
151      .read_at(self.offset, OFFSETOF_ENTRIES)
152      .await
153      .to_vec();
154    let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
155    if len > self.capacity - OFFSETOF_ENTRIES {
156      warn!("journal is corrupt, has invalid length, skipping recovery");
157      return;
158    };
159    raw.extend_from_slice(
160      &mut self
161        .device
162        .read_at(self.offset + OFFSETOF_ENTRIES, len)
163        .await,
164    );
165    let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
166    let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
167    if expected_hash.as_bytes() != recorded_hash {
168      warn!("journal is corrupt, has invalid hash, skipping recovery");
169      return;
170    };
171    if len == 0 {
172      info!("journal is empty, no recovery necessary");
173      return;
174    };
175    let mut recovered_bytes_total = 0;
176    let mut journal_offset = OFFSETOF_ENTRIES;
177    while journal_offset < len {
178      let offset = raw.read_u64_be_at(journal_offset);
179      journal_offset += 8;
180      let data_len = raw.read_u32_be_at(journal_offset);
181      journal_offset += 4;
182      let data = TinyBuf::from_slice(raw.read_at(journal_offset, data_len.into()));
183      journal_offset += u64::from(data_len);
184      self.device.write_at(offset, data).await;
185      recovered_bytes_total += data_len;
186    }
187    self.device.sync_data().await;
188
189    // WARNING: Make sure to sync writes BEFORE erasing journal.
190    self
191      .device
192      .write_at(self.offset, self.generate_blank_state())
193      .await;
194    self.device.sync_data().await;
195    info!(
196      recovered_entries = len,
197      recovered_bytes = recovered_bytes_total,
198      "journal has been recovered"
199    );
200  }
201
202  pub fn begin_transaction(&self) -> Transaction {
203    let serial_no = self
204      .next_txn_serial_no
205      .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
206    Transaction::new(serial_no, self.overlay.clone())
207  }
208
209  pub async fn commit_transaction(&self, txn: Transaction) {
210    let (fut, fut_ctl) = SignalFuture::new();
211    let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
212      unreachable!();
213    };
214    fut.await;
215  }
216
217  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
218  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
219  pub async fn read_with_overlay(&self, offset: u64, len: u64) -> TinyBuf {
220    if let Some(e) = self.overlay.get(&offset) {
221      let overlay_len = e.value().data.len();
222      assert_eq!(
223        overlay_len,
224        usz!(len),
225        "overlay data at {offset} has length {overlay_len} but requested length {len}"
226      );
227      e.value().data.clone()
228    } else {
229      self.device.read_at(offset, len).await.into()
230    }
231  }
232
233  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
234  /// WARNING: It's unlikely you want to to use this function, as it will cause cache coherency problems as this only removes the overlay entry, so stale device data will be read instead. You most likely want to write blank/default data instead. However, this is available if you know what you're doing and have a need.
235  pub async fn clear_from_overlay(&self, offset: u64) {
236    self.overlay.remove(&offset);
237  }
238
239  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
240  pub async fn start_commit_background_loop(&self) {
241    let mut next_serial = 0;
242
243    // These are outside and cleared after each iteration to avoid reallocations.
244    let mut writes = Vec::new();
245    let mut fut_ctls = Vec::new();
246    let mut overlays_to_delete = Vec::new();
247    loop {
248      sleep(self.commit_delay).await;
249
250      let mut len = 0;
251      let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
252      // We must `remove` to take ownership of the write data and avoid copying. But this means we need to reinsert into the map if we cannot process a transaction in this iteration.
253      while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
254        let entry_len = txn.serialised_byte_len();
255        if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
256          // Out of space, wait until next iteration.
257          // TODO THIS MUST PANIC IF THE FIRST, OTHERWISE WE'LL LOOP FOREVER.
258          let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
259            unreachable!();
260          };
261          break;
262        };
263        next_serial += 1;
264        for w in txn.writes {
265          let data_len: u32 = w.data.len().try_into().unwrap();
266          raw.extend_from_slice(&w.offset.to_be_bytes());
267          raw.extend_from_slice(&data_len.to_be_bytes());
268          raw.extend_from_slice(&w.data);
269          writes.push(WriteRequest::new(w.offset, w.data));
270          if w.is_overlay {
271            overlays_to_delete.push((w.offset, serial_no));
272          };
273        }
274        len += entry_len;
275        fut_ctls.push(fut_ctl);
276      }
277      if fut_ctls.is_empty() {
278        // Assert these are empty as each iteration expects to start with cleared reused Vec values.
279        assert!(overlays_to_delete.is_empty());
280        assert!(writes.is_empty());
281        continue;
282      };
283      raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
284      let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
285      raw.write_at(OFFSETOF_HASH, hash.as_bytes());
286      self
287        .device
288        .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
289        .await;
290
291      self
292        .device
293        .write_at_with_delayed_sync(writes.drain(..))
294        .await;
295
296      for fut_ctl in fut_ctls.drain(..) {
297        fut_ctl.signal(());
298      }
299
300      for (offset, serial_no) in overlays_to_delete.drain(..) {
301        self
302          .overlay
303          .remove_if(&offset, |_, e| e.serial_no <= serial_no);
304      }
305
306      // We cannot write_at_with_delayed_sync, as we may write to the journal again by then and have a conflict due to reordering.
307      self
308        .device
309        .write_at(self.offset, self.generate_blank_state())
310        .await;
311      self.device.sync_data().await;
312    }
313  }
314}