write_journal/
lib.rs

1pub mod merge;
2
3use crate::merge::merge_overlapping_writes;
4use dashmap::DashMap;
5use futures::stream::iter;
6use futures::StreamExt;
7use off64::int::Off64ReadInt;
8use off64::int::Off64WriteMutInt;
9use off64::usz;
10use off64::Off64Read;
11use off64::Off64WriteMut;
12use signal_future::SignalFuture;
13use signal_future::SignalFutureController;
14use std::collections::HashMap;
15use std::os::unix::fs::FileExt;
16use std::path::Path;
17use std::sync::atomic::AtomicU64;
18use std::sync::Arc;
19use tokio::fs::OpenOptions;
20use tokio::sync::mpsc::unbounded_channel;
21use tokio::sync::mpsc::UnboundedReceiver;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio::task::spawn_blocking;
24use tracing::info;
25use tracing::warn;
26
27const OFFSETOF_HASH: u64 = 0;
28const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
29const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
30
31// Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
32pub struct TransactionWrite {
33  pub offset: u64,
34  pub data: Vec<u8>,
35  pub is_overlay: bool,
36}
37
38pub struct Transaction {
39  serial_no: u64,
40  writes: Vec<TransactionWrite>,
41  overlay: Arc<DashMap<u64, OverlayEntry>>,
42}
43
44impl Transaction {
45  fn serialised_byte_len(&self) -> u64 {
46    u64::try_from(
47      self
48        .writes
49        .iter()
50        .map(|w| 8 + 4 + w.data.len())
51        .sum::<usize>(),
52    )
53    .unwrap()
54  }
55
56  // Public so `Transaction` can be used elsewhere (not just WriteJournal) e.g. mock journals.
57  pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
58    Self {
59      serial_no,
60      writes: Vec::new(),
61      overlay,
62    }
63  }
64
65  pub fn serial_no(&self) -> u64 {
66    self.serial_no
67  }
68
69  pub fn into_writes(self) -> Vec<TransactionWrite> {
70    self.writes
71  }
72
73  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
74  pub fn write(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
75    self.writes.push(TransactionWrite {
76      offset,
77      data,
78      is_overlay: false,
79    });
80    self
81  }
82
83  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
84  // Use generic so callers don't even need to `.into()` from Vec, array, etc.
85  pub fn write_with_overlay(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
86    self.overlay.insert(offset, OverlayEntry {
87      data: data.clone(),
88      serial_no: self.serial_no,
89    });
90    self.writes.push(TransactionWrite {
91      offset,
92      data,
93      is_overlay: true,
94    });
95    self
96  }
97}
98
99// We cannot evict an overlay entry after a commit loop iteration if the data at the offset has since been updated again using the overlay while the commit loop was happening. This is why we need to track `serial_no`. This mechanism requires slower one-by-one deletes by the commit loop, but allows much faster parallel overlay reads with a DashMap. The alternative would be a RwLock over two maps, one for each generation, swapping them around after each loop iteration.
100// Note that it's not necessary to ever evict for correctness (assuming the overlay is used correctly); eviction is done to avoid memory leaking.
101// Public so `OverlayEntry` can be used elsewhere (not just WriteJournal) e.g. mock journals.
102pub struct OverlayEntry {
103  pub data: Vec<u8>,
104  pub serial_no: u64,
105}
106
107struct DsyncFile(Arc<std::fs::File>);
108
109impl DsyncFile {
110  pub async fn open(path: &Path) -> Self {
111    Self(
112      OpenOptions::new()
113        .read(true)
114        .write(true)
115        .custom_flags(libc::O_DSYNC)
116        .open(path)
117        .await
118        .unwrap()
119        .into_std()
120        .await
121        .into(),
122    )
123  }
124
125  pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
126    let file = self.0.clone();
127    spawn_blocking(move || {
128      let mut buf = vec![0u8; usz!(len)];
129      file.read_exact_at(&mut buf, offset).unwrap();
130      buf
131    })
132    .await
133    .unwrap()
134  }
135
136  pub async fn write_at(&self, offset: u64, data: Vec<u8>) {
137    let file = self.0.clone();
138    spawn_blocking(move || {
139      file.write_all_at(&data, offset).unwrap();
140    })
141    .await
142    .unwrap()
143  }
144}
145
146pub struct WriteJournal {
147  device: DsyncFile,
148  offset: u64,
149  capacity: u64,
150  sender: UnboundedSender<(Transaction, SignalFutureController)>,
151  next_txn_serial_no: AtomicU64,
152  overlay: Arc<DashMap<u64, OverlayEntry>>,
153}
154
155impl WriteJournal {
156  pub async fn open(path: &Path, offset: u64, capacity: u64) -> Arc<Self> {
157    assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
158    let (tx, rx) = unbounded_channel();
159    let journal = Arc::new(Self {
160      // We use DSYNC for more efficient fsync. All our writes must be immediately fsync'd — we don't use kernel buffer as intermediate storage, nor split writes — but fsync isn't granular (and sync_file_range is not portable). Delaying sync is even more pointless.
161      device: DsyncFile::open(path).await,
162      offset,
163      capacity,
164      sender: tx,
165      next_txn_serial_no: AtomicU64::new(0),
166      overlay: Default::default(),
167    });
168    tokio::task::spawn({
169      let journal = journal.clone();
170      async move { journal.start_commit_background_loop(rx).await }
171    });
172    journal
173  }
174
175  pub fn generate_blank_state(&self) -> Vec<u8> {
176    let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
177    raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
178    let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
179    raw.write_at(OFFSETOF_HASH, hash.as_bytes());
180    raw
181  }
182
183  pub async fn format_device(&self) {
184    self
185      .device
186      .write_at(self.offset, self.generate_blank_state())
187      .await;
188  }
189
190  pub async fn recover(&self) {
191    let mut raw = self
192      .device
193      .read_at(self.offset, OFFSETOF_ENTRIES)
194      .await
195      .to_vec();
196    let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
197    if len > self.capacity - OFFSETOF_ENTRIES {
198      warn!("journal is corrupt, has invalid length, skipping recovery");
199      return;
200    };
201    raw.extend_from_slice(
202      &mut self
203        .device
204        .read_at(self.offset + OFFSETOF_ENTRIES, len)
205        .await,
206    );
207    let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
208    let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
209    if expected_hash.as_bytes() != recorded_hash {
210      warn!("journal is corrupt, has invalid hash, skipping recovery");
211      return;
212    };
213    if len == 0 {
214      info!("journal is empty, no recovery necessary");
215      return;
216    };
217    let mut recovered_bytes_total = 0;
218    let mut journal_offset = OFFSETOF_ENTRIES;
219    while journal_offset < len {
220      let offset = raw.read_u64_be_at(journal_offset);
221      journal_offset += 8;
222      let data_len = raw.read_u32_be_at(journal_offset);
223      journal_offset += 4;
224      let data = raw.read_at(journal_offset, data_len.into()).to_vec();
225      journal_offset += u64::from(data_len);
226      self.device.write_at(offset, data).await;
227      recovered_bytes_total += data_len;
228    }
229
230    // WARNING: Make sure to sync writes BEFORE erasing journal.
231    self
232      .device
233      .write_at(self.offset, self.generate_blank_state())
234      .await;
235    info!(
236      recovered_entries = len,
237      recovered_bytes = recovered_bytes_total,
238      "journal has been recovered"
239    );
240  }
241
242  pub fn begin_transaction(&self) -> Transaction {
243    let serial_no = self
244      .next_txn_serial_no
245      .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
246    Transaction::new(serial_no, self.overlay.clone())
247  }
248
249  pub async fn commit_transaction(&self, txn: Transaction) {
250    let (fut, fut_ctl) = SignalFuture::new();
251    self.sender.send((txn, fut_ctl)).unwrap();
252    fut.await;
253  }
254
255  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
256  pub async fn read_with_overlay(&self, offset: u64, len: u64) -> Vec<u8> {
257    if let Some(e) = self.overlay.get(&offset) {
258      let overlay_len = e.value().data.len();
259      assert_eq!(
260        overlay_len,
261        usz!(len),
262        "overlay data at {offset} has length {overlay_len} but requested length {len}"
263      );
264      e.value().data.clone()
265    } else {
266      self.device.read_at(offset, len).await
267    }
268  }
269
270  /// WARNING: Use this function with caution, it's up to the caller to avoid the potential issues with misuse, including logic incorrectness, cache incoherency, and memory leaking. Carefully read notes/Overlay.md before using the overlay.
271  /// WARNING: It's unlikely you want to to use this function, as it will cause cache coherency problems as this only removes the overlay entry, so stale device data will be read instead. You most likely want to write blank/default data instead. However, this is available if you know what you're doing and have a need.
272  pub async fn clear_from_overlay(&self, offset: u64) {
273    self.overlay.remove(&offset);
274  }
275
276  async fn start_commit_background_loop(
277    &self,
278    mut rx: UnboundedReceiver<(Transaction, SignalFutureController)>,
279  ) {
280    let mut next_serial = 0;
281    let mut pending = HashMap::new();
282    // There's no point to sleeping. We can only serially do I/O. So in the time we wait, we could have just done some I/O.
283    // Also, tokio::sleep granularity is 1 ms. Super high latency.
284    while let Some((txn, fut_ctl)) = rx.recv().await {
285      pending.insert(txn.serial_no, (txn, fut_ctl));
286
287      let mut len = 0;
288      let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
289      let mut writes = Vec::new();
290      let mut overlays_to_delete = Vec::new();
291      let mut fut_ctls = Vec::new();
292      // We must `remove` to take ownership of the write data and avoid copying. But this means we need to reinsert into the map if we cannot process a transaction in this iteration.
293      while let Some((serial_no, (txn, fut_ctl))) = pending.remove_entry(&next_serial) {
294        let entry_len = txn.serialised_byte_len();
295        if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
296          // Out of space, wait until next iteration.
297          // TODO THIS MUST PANIC IF THE FIRST, OTHERWISE WE'LL LOOP FOREVER.
298          let None = pending.insert(serial_no, (txn, fut_ctl)) else {
299            unreachable!();
300          };
301          break;
302        };
303        next_serial += 1;
304        for w in txn.writes {
305          let data_len: u32 = w.data.len().try_into().unwrap();
306          raw.extend_from_slice(&w.offset.to_be_bytes());
307          raw.extend_from_slice(&data_len.to_be_bytes());
308          raw.extend_from_slice(&w.data);
309          writes.push((w.offset, w.data));
310          if w.is_overlay {
311            overlays_to_delete.push((w.offset, serial_no));
312          };
313        }
314        len += entry_len;
315        fut_ctls.push(fut_ctl);
316      }
317      if fut_ctls.is_empty() {
318        // Assert these are empty as each iteration expects to start with cleared reused Vec values.
319        assert!(overlays_to_delete.is_empty());
320        assert!(writes.is_empty());
321        continue;
322      };
323      raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
324      let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
325      raw.write_at(OFFSETOF_HASH, hash.as_bytes());
326      self.device.write_at(self.offset, raw).await;
327
328      // Keep semantic order without slowing down to serial writes.
329      let writes_ordered = merge_overlapping_writes(writes);
330      iter(writes_ordered)
331        .for_each_concurrent(None, async |(_, (offset, data))| {
332          self.device.write_at(offset, data).await;
333        })
334        .await;
335
336      for fut_ctl in fut_ctls.drain(..) {
337        fut_ctl.signal(());
338      }
339
340      for (offset, serial_no) in overlays_to_delete.drain(..) {
341        self
342          .overlay
343          .remove_if(&offset, |_, e| e.serial_no <= serial_no);
344      }
345
346      self
347        .device
348        .write_at(self.offset, self.generate_blank_state())
349        .await;
350    }
351  }
352}