write_journal/
lib.rs

1pub mod merge;
2
3use crate::merge::merge_overlapping_writes;
4use dashmap::DashMap;
5use futures::stream::iter;
6use futures::StreamExt;
7use off64::int::Off64ReadInt;
8use off64::int::Off64WriteMutInt;
9use off64::usz;
10use off64::Off64Read;
11use off64::Off64WriteMut;
12use signal_future::SignalFuture;
13use signal_future::SignalFutureController;
14use std::collections::HashMap;
15use std::os::unix::fs::FileExt;
16use std::path::Path;
17use std::sync::atomic::AtomicU64;
18use std::sync::Arc;
19use tokio::fs::OpenOptions;
20use tokio::sync::mpsc::unbounded_channel;
21use tokio::sync::mpsc::UnboundedReceiver;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio::task::spawn_blocking;
24use tracing::info;
25use tracing::warn;
26
27const OFFSETOF_HASH: u64 = 0;
28const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
29const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
30
31// Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
32#[derive(Debug)]
33pub struct TransactionWrite {
34  pub offset: u64,
35  pub data: Vec<u8>,
36  pub is_overlay: bool,
37}
38
39#[derive(Debug)]
40pub struct Transaction {
41  serial_no: u64,
42  writes: Vec<TransactionWrite>,
43  overlay: Arc<DashMap<u64, OverlayEntry>>,
44}
45
46impl Transaction {
47  fn serialised_byte_len(&self) -> u64 {
48    u64::try_from(
49      self
50        .writes
51        .iter()
52        .map(|w| 8 + 4 + w.data.len())
53        .sum::<usize>(),
54    )
55    .unwrap()
56  }
57
58  // Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
59  pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
60    Self {
61      serial_no,
62      writes: Vec::new(),
63      overlay,
64    }
65  }
66
67  pub fn serial_no(&self) -> u64 {
68    self.serial_no
69  }
70
71  pub fn into_writes(self) -> Vec<TransactionWrite> {
72    self.writes
73  }
74
75  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
76  pub fn write(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
77    self.writes.push(TransactionWrite {
78      offset,
79      data,
80      is_overlay: false,
81    });
82    self
83  }
84
85  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
86  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
87  pub fn write_with_overlay(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
88    self.overlay.insert(offset, OverlayEntry {
89      data: data.clone(),
90      serial_no: self.serial_no,
91    });
92    self.writes.push(TransactionWrite {
93      offset,
94      data,
95      is_overlay: true,
96    });
97    self
98  }
99}
100
101// We cannot evict an overlay entry after a commit loop iteration if the data at the offset has since been updated again using the overlay while the commit loop was happening. This is why we need to track `serial_no`. This mechanism requires slower one-by-one deletes by the commit loop, but allows much faster parallel overlay reads with a DashMap. The alternative would be a RwLock over two maps, one for each generation, swapping them around after each loop iteration.
102// Note that it's not necessary to ever evict for correctness (assuming the overlay is used correctly); eviction is done to avoid memory leaking.
103// Public so `OverlayEntry` can be used elsewhere (not just WriteJournal) e.g. mock journals.
104#[derive(Debug)]
105pub struct OverlayEntry {
106  pub data: Vec<u8>,
107  pub serial_no: u64,
108}
109
110struct DsyncFile(Arc<std::fs::File>);
111
112impl DsyncFile {
113  pub async fn open(path: &Path) -> Self {
114    Self(
115      OpenOptions::new()
116        .read(true)
117        .write(true)
118        .custom_flags(libc::O_DSYNC)
119        .open(path)
120        .await
121        .unwrap()
122        .into_std()
123        .await
124        .into(),
125    )
126  }
127
128  pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
129    let file = self.0.clone();
130    spawn_blocking(move || {
131      let mut buf = vec![0u8; usz!(len)];
132      file.read_exact_at(&mut buf, offset).unwrap();
133      buf
134    })
135    .await
136    .unwrap()
137  }
138
139  pub async fn write_at(&self, offset: u64, data: Vec<u8>) {
140    let file = self.0.clone();
141    spawn_blocking(move || {
142      file.write_all_at(&data, offset).unwrap();
143    })
144    .await
145    .unwrap()
146  }
147}
148
149pub struct WriteJournal {
150  device: DsyncFile,
151  offset: u64,
152  capacity: u64,
153  sender: UnboundedSender<(Transaction, SignalFutureController)>,
154  next_txn_serial_no: AtomicU64,
155  overlay: Arc<DashMap<u64, OverlayEntry>>,
156}
157
158impl WriteJournal {
159  pub async fn open(path: &Path, offset: u64, capacity: u64) -> Arc<Self> {
160    assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
161    let (tx, rx) = unbounded_channel();
162    let journal = Arc::new(Self {
163      // We use DSYNC for more efficient fsync. All our writes must be immediately fsync'd — we don't use kernel buffer as intermediate storage, nor split writes — but fsync isn't granular (and sync_file_range is not portable). Delaying sync is even more pointless.
164      device: DsyncFile::open(path).await,
165      offset,
166      capacity,
167      sender: tx,
168      next_txn_serial_no: AtomicU64::new(0),
169      overlay: Default::default(),
170    });
171    tokio::task::spawn({
172      let journal = journal.clone();
173      async move { journal.start_commit_background_loop(rx).await }
174    });
175    journal
176  }
177
178  pub fn generate_blank_state(&self) -> Vec<u8> {
179    let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
180    raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
181    let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
182    raw.write_at(OFFSETOF_HASH, hash.as_bytes());
183    raw
184  }
185
186  pub async fn format_device(&self) {
187    self
188      .device
189      .write_at(self.offset, self.generate_blank_state())
190      .await;
191  }
192
193  pub async fn recover(&self) {
194    let mut raw = self
195      .device
196      .read_at(self.offset, OFFSETOF_ENTRIES)
197      .await
198      .to_vec();
199    let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
200    if len > self.capacity - OFFSETOF_ENTRIES {
201      warn!("journal is corrupt, has invalid length, skipping recovery");
202      return;
203    };
204    raw.extend_from_slice(
205      &mut self
206        .device
207        .read_at(self.offset + OFFSETOF_ENTRIES, len)
208        .await,
209    );
210    let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
211    let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
212    if expected_hash.as_bytes() != recorded_hash {
213      warn!("journal is corrupt, has invalid hash, skipping recovery");
214      return;
215    };
216    if len == 0 {
217      info!("journal is empty, no recovery necessary");
218      return;
219    };
220    let mut recovered_bytes_total = 0;
221    let mut journal_offset = OFFSETOF_ENTRIES;
222    while journal_offset < len {
223      let offset = raw.read_u64_be_at(journal_offset);
224      journal_offset += 8;
225      let data_len = raw.read_u32_be_at(journal_offset);
226      journal_offset += 4;
227      let data = raw.read_at(journal_offset, data_len.into()).to_vec();
228      journal_offset += u64::from(data_len);
229      self.device.write_at(offset, data).await;
230      recovered_bytes_total += data_len;
231    }
232
233    // WARNING: Make sure to sync writes BEFORE erasing journal.
234    self
235      .device
236      .write_at(self.offset, self.generate_blank_state())
237      .await;
238    info!(
239      recovered_entries = len,
240      recovered_bytes = recovered_bytes_total,
241      "journal has been recovered"
242    );
243  }
244
245  pub fn begin_transaction(&self) -> Transaction {
246    let serial_no = self
247      .next_txn_serial_no
248      .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
249    Transaction::new(serial_no, self.overlay.clone())
250  }
251
252  pub async fn commit_transaction(&self, txn: Transaction) {
253    let (fut, fut_ctl) = SignalFuture::new();
254    self.sender.send((txn, fut_ctl)).unwrap();
255    fut.await;
256  }
257
258  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
259  pub async fn read_with_overlay(&self, offset: u64, len: u64) -> Vec<u8> {
260    if let Some(e) = self.overlay.get(&offset) {
261      let overlay_len = e.value().data.len();
262      assert_eq!(
263        overlay_len,
264        usz!(len),
265        "overlay data at {offset} has length {overlay_len} but requested length {len}"
266      );
267      e.value().data.clone()
268    } else {
269      self.device.read_at(offset, len).await
270    }
271  }
272
273  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
274  /// WARNING: It's unlikely you want to to use this function, as it will cause cache coherency problems as this only removes the overlay entry, so stale device data will be read instead. You most likely want to write blank/default data instead. However, this is available if you know what you're doing and have a need.
275  pub async fn clear_from_overlay(&self, offset: u64) {
276    self.overlay.remove(&offset);
277  }
278
279  async fn start_commit_background_loop(
280    &self,
281    mut rx: UnboundedReceiver<(Transaction, SignalFutureController)>,
282  ) {
283    let mut next_serial = 0;
284    let mut pending = HashMap::new();
285    // There's no point to sleeping. We can only serially do I/O. So in the time we wait, we could have just done some I/O.
286    // Also, tokio::sleep granularity is 1 ms. Super high latency.
287    while let Some((txn, fut_ctl)) = rx.recv().await {
288      pending.insert(txn.serial_no, (txn, fut_ctl));
289
290      let mut len = 0;
291      let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
292      let mut writes = Vec::new();
293      let mut overlays_to_delete = Vec::new();
294      let mut fut_ctls = Vec::new();
295      // We must `remove` to take ownership of the write data and avoid copying. But this means we need to reinsert into the map if we cannot process a transaction in this iteration.
296      while let Some((serial_no, (txn, fut_ctl))) = pending.remove_entry(&next_serial) {
297        let entry_len = txn.serialised_byte_len();
298        if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
299          // Out of space, wait until next iteration.
300          // TODO THIS MUST PANIC IF THE FIRST, OTHERWISE WE'LL LOOP FOREVER.
301          let None = pending.insert(serial_no, (txn, fut_ctl)) else {
302            unreachable!();
303          };
304          break;
305        };
306        next_serial += 1;
307        for w in txn.writes {
308          let data_len: u32 = w.data.len().try_into().unwrap();
309          raw.extend_from_slice(&w.offset.to_be_bytes());
310          raw.extend_from_slice(&data_len.to_be_bytes());
311          raw.extend_from_slice(&w.data);
312          writes.push((w.offset, w.data));
313          if w.is_overlay {
314            overlays_to_delete.push((w.offset, serial_no));
315          };
316        }
317        len += entry_len;
318        fut_ctls.push(fut_ctl);
319      }
320      if fut_ctls.is_empty() {
321        // Assert these are empty as each iteration expects to start with cleared reused Vec values.
322        assert!(overlays_to_delete.is_empty());
323        assert!(writes.is_empty());
324        continue;
325      };
326      raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
327      let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
328      raw.write_at(OFFSETOF_HASH, hash.as_bytes());
329      self.device.write_at(self.offset, raw).await;
330
331      // Keep semantic order without slowing down to serial writes.
332      let writes_ordered = merge_overlapping_writes(writes);
333      iter(writes_ordered)
334        .for_each_concurrent(None, async |(offset, data)| {
335          self.device.write_at(offset, data).await;
336        })
337        .await;
338
339      for fut_ctl in fut_ctls.drain(..) {
340        fut_ctl.signal(());
341      }
342
343      for (offset, serial_no) in overlays_to_delete.drain(..) {
344        self
345          .overlay
346          .remove_if(&offset, |_, e| e.serial_no <= serial_no);
347      }
348
349      self
350        .device
351        .write_at(self.offset, self.generate_blank_state())
352        .await;
353    }
354  }
355}