write_journal/
lib.rs

1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use rustc_hash::FxHasher;
8use seekable_async_file::SeekableAsyncFile;
9use seekable_async_file::WriteRequest;
10use signal_future::SignalFuture;
11use signal_future::SignalFutureController;
12use std::hash::BuildHasherDefault;
13use std::iter::once;
14use std::sync::atomic::AtomicU64;
15use std::sync::Arc;
16use std::time::Duration;
17use tinybuf::TinyBuf;
18use tokio::time::sleep;
19use tracing::info;
20use tracing::warn;
21
22const OFFSETOF_HASH: u64 = 0;
23const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
24const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
25
26struct TransactionWrite {
27  offset: u64,
28  data: TinyBuf,
29  is_overlay: bool,
30}
31
32pub struct Transaction {
33  serial_no: u64,
34  writes: Vec<TransactionWrite>,
35  overlay: Arc<DashMap<u64, OverlayEntry, BuildHasherDefault<FxHasher>>>,
36}
37
38impl Transaction {
39  fn serialised_byte_len(&self) -> u64 {
40    u64::try_from(
41      self
42        .writes
43        .iter()
44        .map(|w| 8 + 4 + w.data.len())
45        .sum::<usize>(),
46    )
47    .unwrap()
48  }
49
50  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
51  pub fn write<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
52    let data = data.into();
53    self.writes.push(TransactionWrite {
54      offset,
55      data,
56      is_overlay: false,
57    });
58    self
59  }
60
61  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
62  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
63  pub fn write_with_overlay<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
64    let data = data.into();
65    self.overlay.insert(offset, OverlayEntry {
66      data: data.clone(),
67      serial_no: self.serial_no,
68    });
69    self.writes.push(TransactionWrite {
70      offset,
71      data,
72      is_overlay: true,
73    });
74    self
75  }
76}
77
78// We cannot evict an overlay entry after a commit loop iteration if the data at the offset has since been updated again using the overlay while the commit loop was happening. This is why we need to track `serial_no`. This mechanism requires slower one-by-one deletes by the commit loop, but allows much faster parallel overlay reads with a DashMap. The alternative would be a RwLock over two maps, one for each generation, swapping them around after each loop iteration.
79// Note that it's not necessary to ever evict for correctness (assuming the overlay is used correctly); eviction is done to avoid memory leaking.
80struct OverlayEntry {
81  data: TinyBuf,
82  serial_no: u64,
83}
84
85pub struct WriteJournal {
86  device: SeekableAsyncFile,
87  offset: u64,
88  capacity: u64,
89  pending: DashMap<u64, (Transaction, SignalFutureController), BuildHasherDefault<FxHasher>>,
90  next_txn_serial_no: AtomicU64,
91  commit_delay: Duration,
92  overlay: Arc<DashMap<u64, OverlayEntry, BuildHasherDefault<FxHasher>>>,
93}
94
95impl WriteJournal {
96  pub fn new(
97    device: SeekableAsyncFile,
98    offset: u64,
99    capacity: u64,
100    commit_delay: Duration,
101  ) -> Self {
102    assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
103    Self {
104      device,
105      offset,
106      capacity,
107      pending: Default::default(),
108      next_txn_serial_no: AtomicU64::new(0),
109      commit_delay,
110      overlay: Default::default(),
111    }
112  }
113
114  pub fn generate_blank_state(&self) -> Vec<u8> {
115    let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
116    raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
117    let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
118    raw.write_at(OFFSETOF_HASH, hash.as_bytes());
119    raw
120  }
121
122  pub async fn format_device(&self) {
123    self
124      .device
125      .write_at(self.offset, self.generate_blank_state())
126      .await;
127  }
128
129  pub async fn recover(&self) {
130    let mut raw = self
131      .device
132      .read_at(self.offset, OFFSETOF_ENTRIES)
133      .await
134      .to_vec();
135    let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
136    if len > self.capacity - OFFSETOF_ENTRIES {
137      warn!("journal is corrupt, has invalid length, skipping recovery");
138      return;
139    };
140    raw.extend_from_slice(
141      &mut self
142        .device
143        .read_at(self.offset + OFFSETOF_ENTRIES, len)
144        .await,
145    );
146    let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
147    let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
148    if expected_hash.as_bytes() != recorded_hash {
149      warn!("journal is corrupt, has invalid hash, skipping recovery");
150      return;
151    };
152    if len == 0 {
153      info!("journal is empty, no recovery necessary");
154      return;
155    };
156    let mut recovered_bytes_total = 0;
157    let mut journal_offset = OFFSETOF_ENTRIES;
158    while journal_offset < len {
159      let offset = raw.read_u64_be_at(journal_offset);
160      journal_offset += 8;
161      let data_len = raw.read_u32_be_at(journal_offset);
162      journal_offset += 4;
163      let data = TinyBuf::from_slice(raw.read_at(journal_offset, data_len.into()));
164      journal_offset += u64::from(data_len);
165      self.device.write_at(offset, data).await;
166      recovered_bytes_total += data_len;
167    }
168    self.device.sync_data().await;
169
170    // WARNING: Make sure to sync writes BEFORE erasing journal.
171    self
172      .device
173      .write_at(self.offset, self.generate_blank_state())
174      .await;
175    self.device.sync_data().await;
176    info!(
177      recovered_entries = len,
178      recovered_bytes = recovered_bytes_total,
179      "journal has been recovered"
180    );
181  }
182
183  pub fn begin_transaction(&self) -> Transaction {
184    let serial_no = self
185      .next_txn_serial_no
186      .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
187    Transaction {
188      serial_no,
189      writes: Vec::new(),
190      overlay: self.overlay.clone(),
191    }
192  }
193
194  pub async fn commit_transaction(&self, txn: Transaction) {
195    let (fut, fut_ctl) = SignalFuture::new();
196    let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
197      unreachable!();
198    };
199    fut.await;
200  }
201
202  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
203  pub async fn read_with_overlay(&self, offset: u64, len: u64) -> TinyBuf {
204    if let Some(e) = self.overlay.get(&offset) {
205      let overlay_len = e.value().data.len();
206      assert_eq!(
207        overlay_len,
208        usz!(len),
209        "overlay data at {offset} has length {overlay_len} but requested length {len}"
210      );
211      e.value().data.clone()
212    } else {
213      self.device.read_at(offset, len).await.into()
214    }
215  }
216
217  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
218  /// WARNING: It's unlikely you want to to use this function, as it will cause cache coherency problems as this only removes the overlay entry, so stale device data will be read instead. You most likely want to write blank/default data instead. However, this is available if you know what you're doing and have a need.
219  pub async fn clear_from_overlay(&self, offset: u64) {
220    self.overlay.remove(&offset);
221  }
222
223  pub async fn start_commit_background_loop(&self) {
224    let mut next_serial = 0;
225
226    // These are outside and cleared after each iteration to avoid reallocations.
227    let mut writes = Vec::new();
228    let mut fut_ctls = Vec::new();
229    let mut overlays_to_delete = Vec::new();
230    loop {
231      sleep(self.commit_delay).await;
232
233      let mut len = 0;
234      let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
235      // We must `remove` to take ownership of the write data and avoid copying. But this means we need to reinsert into the map if we cannot process a transaction in this iteration.
236      while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
237        let entry_len = txn.serialised_byte_len();
238        if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
239          // Out of space, wait until next iteration.
240          let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
241            unreachable!();
242          };
243          break;
244        };
245        next_serial += 1;
246        for w in txn.writes {
247          let data_len: u32 = w.data.len().try_into().unwrap();
248          raw.extend_from_slice(&w.offset.to_be_bytes());
249          raw.extend_from_slice(&data_len.to_be_bytes());
250          raw.extend_from_slice(&w.data);
251          writes.push(WriteRequest::new(w.offset, w.data));
252          if w.is_overlay {
253            overlays_to_delete.push((w.offset, serial_no));
254          };
255        }
256        len += entry_len;
257        fut_ctls.push(fut_ctl);
258      }
259      if fut_ctls.is_empty() {
260        // Assert these are empty as each iteration expects to start with cleared reused Vec values.
261        assert!(overlays_to_delete.is_empty());
262        assert!(writes.is_empty());
263        continue;
264      };
265      raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
266      let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
267      raw.write_at(OFFSETOF_HASH, hash.as_bytes());
268      self
269        .device
270        .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
271        .await;
272
273      self
274        .device
275        .write_at_with_delayed_sync(writes.drain(..))
276        .await;
277
278      for fut_ctl in fut_ctls.drain(..) {
279        fut_ctl.signal(());
280      }
281
282      for (offset, serial_no) in overlays_to_delete.drain(..) {
283        self
284          .overlay
285          .remove_if(&offset, |_, e| e.serial_no <= serial_no);
286      }
287
288      // We cannot write_at_with_delayed_sync, as we may write to the journal again by then and have a conflict due to reordering.
289      self
290        .device
291        .write_at(self.offset, self.generate_blank_state())
292        .await;
293      self.device.sync_data().await;
294    }
295  }
296}