write_journal/
lib.rs

1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use seekable_async_file::SeekableAsyncFile;
8use seekable_async_file::WriteRequest;
9use signal_future::SignalFuture;
10use signal_future::SignalFutureController;
11use std::iter::once;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15use tinybuf::TinyBuf;
16use tokio::time::sleep;
17use tracing::info;
18use tracing::warn;
19
20const OFFSETOF_HASH: u64 = 0;
21const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
22const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
23
24// Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
25pub struct TransactionWrite {
26  pub offset: u64,
27  pub data: TinyBuf,
28  pub is_overlay: bool,
29}
30
31pub struct Transaction {
32  serial_no: u64,
33  writes: Vec<TransactionWrite>,
34  overlay: Arc<DashMap<u64, OverlayEntry>>,
35}
36
37impl Transaction {
38  fn serialised_byte_len(&self) -> u64 {
39    u64::try_from(
40      self
41        .writes
42        .iter()
43        .map(|w| 8 + 4 + w.data.len())
44        .sum::<usize>(),
45    )
46    .unwrap()
47  }
48
49  // Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
50  pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
51    Self {
52      serial_no,
53      writes: Vec::new(),
54      overlay,
55    }
56  }
57
58  pub fn serial_no(&self) -> u64 {
59    self.serial_no
60  }
61
62  pub fn into_writes(self) -> Vec<TransactionWrite> {
63    self.writes
64  }
65
66  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
67  pub fn write<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
68    let data = data.into();
69    self.writes.push(TransactionWrite {
70      offset,
71      data,
72      is_overlay: false,
73    });
74    self
75  }
76
77  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
78  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
79  pub fn write_with_overlay<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
80    let data = data.into();
81    self.overlay.insert(offset, OverlayEntry {
82      data: data.clone(),
83      serial_no: self.serial_no,
84    });
85    self.writes.push(TransactionWrite {
86      offset,
87      data,
88      is_overlay: true,
89    });
90    self
91  }
92}
93
94// We cannot evict an overlay entry after a commit loop iteration if the data at the offset has since been updated again using the overlay while the commit loop was happening. This is why we need to track `serial_no`. This mechanism requires slower one-by-one deletes by the commit loop, but allows much faster parallel overlay reads with a DashMap. The alternative would be a RwLock over two maps, one for each generation, swapping them around after each loop iteration.
95// Note that it's not necessary to ever evict for correctness (assuming the overlay is used correctly); eviction is done to avoid memory leaking.
96pub struct OverlayEntry {
97  data: TinyBuf,
98  serial_no: u64,
99}
100
101pub struct WriteJournal {
102  device: SeekableAsyncFile,
103  offset: u64,
104  capacity: u64,
105  pending: DashMap<u64, (Transaction, SignalFutureController)>,
106  next_txn_serial_no: AtomicU64,
107  commit_delay: Duration,
108  overlay: Arc<DashMap<u64, OverlayEntry>>,
109}
110
111impl WriteJournal {
112  pub fn new(
113    device: SeekableAsyncFile,
114    offset: u64,
115    capacity: u64,
116    commit_delay: Duration,
117  ) -> Self {
118    assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
119    Self {
120      device,
121      offset,
122      capacity,
123      pending: Default::default(),
124      next_txn_serial_no: AtomicU64::new(0),
125      commit_delay,
126      overlay: Default::default(),
127    }
128  }
129
130  pub fn generate_blank_state(&self) -> Vec<u8> {
131    let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
132    raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
133    let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
134    raw.write_at(OFFSETOF_HASH, hash.as_bytes());
135    raw
136  }
137
138  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
139  pub async fn format_device(&self) {
140    self
141      .device
142      .write_at(self.offset, self.generate_blank_state())
143      .await;
144  }
145
146  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
147  pub async fn recover(&self) {
148    let mut raw = self
149      .device
150      .read_at(self.offset, OFFSETOF_ENTRIES)
151      .await
152      .to_vec();
153    let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
154    if len > self.capacity - OFFSETOF_ENTRIES {
155      warn!("journal is corrupt, has invalid length, skipping recovery");
156      return;
157    };
158    raw.extend_from_slice(
159      &mut self
160        .device
161        .read_at(self.offset + OFFSETOF_ENTRIES, len)
162        .await,
163    );
164    let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
165    let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
166    if expected_hash.as_bytes() != recorded_hash {
167      warn!("journal is corrupt, has invalid hash, skipping recovery");
168      return;
169    };
170    if len == 0 {
171      info!("journal is empty, no recovery necessary");
172      return;
173    };
174    let mut recovered_bytes_total = 0;
175    let mut journal_offset = OFFSETOF_ENTRIES;
176    while journal_offset < len {
177      let offset = raw.read_u64_be_at(journal_offset);
178      journal_offset += 8;
179      let data_len = raw.read_u32_be_at(journal_offset);
180      journal_offset += 4;
181      let data = TinyBuf::from_slice(raw.read_at(journal_offset, data_len.into()));
182      journal_offset += u64::from(data_len);
183      self.device.write_at(offset, data).await;
184      recovered_bytes_total += data_len;
185    }
186    self.device.sync_data().await;
187
188    // WARNING: Make sure to sync writes BEFORE erasing journal.
189    self
190      .device
191      .write_at(self.offset, self.generate_blank_state())
192      .await;
193    self.device.sync_data().await;
194    info!(
195      recovered_entries = len,
196      recovered_bytes = recovered_bytes_total,
197      "journal has been recovered"
198    );
199  }
200
201  pub fn begin_transaction(&self) -> Transaction {
202    let serial_no = self
203      .next_txn_serial_no
204      .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
205    Transaction::new(serial_no, self.overlay.clone())
206  }
207
208  pub async fn commit_transaction(&self, txn: Transaction) {
209    let (fut, fut_ctl) = SignalFuture::new();
210    let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
211      unreachable!();
212    };
213    fut.await;
214  }
215
216  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
217  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
218  pub async fn read_with_overlay(&self, offset: u64, len: u64) -> TinyBuf {
219    if let Some(e) = self.overlay.get(&offset) {
220      let overlay_len = e.value().data.len();
221      assert_eq!(
222        overlay_len,
223        usz!(len),
224        "overlay data at {offset} has length {overlay_len} but requested length {len}"
225      );
226      e.value().data.clone()
227    } else {
228      self.device.read_at(offset, len).await.into()
229    }
230  }
231
232  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
233  /// WARNING: It's unlikely you want to to use this function, as it will cause cache coherency problems as this only removes the overlay entry, so stale device data will be read instead. You most likely want to write blank/default data instead. However, this is available if you know what you're doing and have a need.
234  pub async fn clear_from_overlay(&self, offset: u64) {
235    self.overlay.remove(&offset);
236  }
237
238  #[cfg(any(feature = "io_mmap", feature = "io_file"))]
239  pub async fn start_commit_background_loop(&self) {
240    let mut next_serial = 0;
241
242    // These are outside and cleared after each iteration to avoid reallocations.
243    let mut writes = Vec::new();
244    let mut fut_ctls = Vec::new();
245    let mut overlays_to_delete = Vec::new();
246    loop {
247      sleep(self.commit_delay).await;
248
249      let mut len = 0;
250      let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
251      // We must `remove` to take ownership of the write data and avoid copying. But this means we need to reinsert into the map if we cannot process a transaction in this iteration.
252      while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
253        let entry_len = txn.serialised_byte_len();
254        if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
255          // Out of space, wait until next iteration.
256          // TODO THIS MUST PANIC IF THE FIRST, OTHERWISE WE'LL LOOP FOREVER.
257          let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
258            unreachable!();
259          };
260          break;
261        };
262        next_serial += 1;
263        for w in txn.writes {
264          let data_len: u32 = w.data.len().try_into().unwrap();
265          raw.extend_from_slice(&w.offset.to_be_bytes());
266          raw.extend_from_slice(&data_len.to_be_bytes());
267          raw.extend_from_slice(&w.data);
268          writes.push(WriteRequest::new(w.offset, w.data));
269          if w.is_overlay {
270            overlays_to_delete.push((w.offset, serial_no));
271          };
272        }
273        len += entry_len;
274        fut_ctls.push(fut_ctl);
275      }
276      if fut_ctls.is_empty() {
277        // Assert these are empty as each iteration expects to start with cleared reused Vec values.
278        assert!(overlays_to_delete.is_empty());
279        assert!(writes.is_empty());
280        continue;
281      };
282      raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
283      let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
284      raw.write_at(OFFSETOF_HASH, hash.as_bytes());
285      self
286        .device
287        .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
288        .await;
289
290      self
291        .device
292        .write_at_with_delayed_sync(writes.drain(..))
293        .await;
294
295      for fut_ctl in fut_ctls.drain(..) {
296        fut_ctl.signal(());
297      }
298
299      for (offset, serial_no) in overlays_to_delete.drain(..) {
300        self
301          .overlay
302          .remove_if(&offset, |_, e| e.serial_no <= serial_no);
303      }
304
305      // We cannot write_at_with_delayed_sync, as we may write to the journal again by then and have a conflict due to reordering.
306      self
307        .device
308        .write_at(self.offset, self.generate_blank_state())
309        .await;
310      self.device.sync_data().await;
311    }
312  }
313}