1pub mod merge;
2
3use crate::merge::merge_overlapping_writes;
4use dashmap::DashMap;
5use futures::stream::iter;
6use futures::StreamExt;
7use off64::int::Off64ReadInt;
8use off64::int::Off64WriteMutInt;
9use off64::usz;
10use off64::Off64Read;
11use off64::Off64WriteMut;
12use signal_future::SignalFuture;
13use signal_future::SignalFutureController;
14use std::collections::HashMap;
15use std::os::unix::fs::FileExt;
16use std::path::Path;
17use std::sync::atomic::AtomicU64;
18use std::sync::Arc;
19use tokio::fs::OpenOptions;
20use tokio::sync::mpsc::unbounded_channel;
21use tokio::sync::mpsc::UnboundedReceiver;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio::task::spawn_blocking;
24use tracing::info;
25use tracing::warn;
26
27const OFFSETOF_HASH: u64 = 0;
28const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
29const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
30
31#[derive(Debug)]
33pub struct TransactionWrite {
34 pub offset: u64,
35 pub data: Vec<u8>,
36 pub is_overlay: bool,
37}
38
39#[derive(Debug)]
40pub struct Transaction {
41 serial_no: u64,
42 writes: Vec<TransactionWrite>,
43 overlay: Arc<DashMap<u64, OverlayEntry>>,
44}
45
46impl Transaction {
47 fn serialised_byte_len(&self) -> u64 {
48 u64::try_from(
49 self
50 .writes
51 .iter()
52 .map(|w| 8 + 4 + w.data.len())
53 .sum::<usize>(),
54 )
55 .unwrap()
56 }
57
58 pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
60 Self {
61 serial_no,
62 writes: Vec::new(),
63 overlay,
64 }
65 }
66
67 pub fn serial_no(&self) -> u64 {
68 self.serial_no
69 }
70
71 pub fn into_writes(self) -> Vec<TransactionWrite> {
72 self.writes
73 }
74
75 pub fn write(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
77 self.writes.push(TransactionWrite {
78 offset,
79 data,
80 is_overlay: false,
81 });
82 self
83 }
84
85 pub fn write_with_overlay(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
88 self.overlay.insert(offset, OverlayEntry {
89 data: data.clone(),
90 serial_no: self.serial_no,
91 });
92 self.writes.push(TransactionWrite {
93 offset,
94 data,
95 is_overlay: true,
96 });
97 self
98 }
99}
100
101#[derive(Debug)]
105pub struct OverlayEntry {
106 pub data: Vec<u8>,
107 pub serial_no: u64,
108}
109
110struct DsyncFile(Arc<std::fs::File>);
111
112impl DsyncFile {
113 pub async fn open(path: &Path) -> Self {
114 Self(
115 OpenOptions::new()
116 .read(true)
117 .write(true)
118 .custom_flags(libc::O_DSYNC)
119 .open(path)
120 .await
121 .unwrap()
122 .into_std()
123 .await
124 .into(),
125 )
126 }
127
128 pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
129 let file = self.0.clone();
130 spawn_blocking(move || {
131 let mut buf = vec![0u8; usz!(len)];
132 file.read_exact_at(&mut buf, offset).unwrap();
133 buf
134 })
135 .await
136 .unwrap()
137 }
138
139 pub async fn write_at(&self, offset: u64, data: Vec<u8>) {
140 let file = self.0.clone();
141 spawn_blocking(move || {
142 file.write_all_at(&data, offset).unwrap();
143 })
144 .await
145 .unwrap()
146 }
147}
148
149pub struct WriteJournal {
150 device: DsyncFile,
151 offset: u64,
152 capacity: u64,
153 sender: UnboundedSender<(Transaction, SignalFutureController)>,
154 next_txn_serial_no: AtomicU64,
155 overlay: Arc<DashMap<u64, OverlayEntry>>,
156}
157
158impl WriteJournal {
159 pub async fn open(path: &Path, offset: u64, capacity: u64) -> Arc<Self> {
160 assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
161 let (tx, rx) = unbounded_channel();
162 let journal = Arc::new(Self {
163 device: DsyncFile::open(path).await,
165 offset,
166 capacity,
167 sender: tx,
168 next_txn_serial_no: AtomicU64::new(0),
169 overlay: Default::default(),
170 });
171 tokio::task::spawn({
172 let journal = journal.clone();
173 async move { journal.start_commit_background_loop(rx).await }
174 });
175 journal
176 }
177
178 pub fn generate_blank_state(&self) -> Vec<u8> {
179 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
180 raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
181 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
182 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
183 raw
184 }
185
186 pub async fn format_device(&self) {
187 self
188 .device
189 .write_at(self.offset, self.generate_blank_state())
190 .await;
191 }
192
193 pub async fn recover(&self) {
194 let mut raw = self
195 .device
196 .read_at(self.offset, OFFSETOF_ENTRIES)
197 .await
198 .to_vec();
199 let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
200 if len > self.capacity - OFFSETOF_ENTRIES {
201 warn!("journal is corrupt, has invalid length, skipping recovery");
202 return;
203 };
204 raw.extend_from_slice(
205 &mut self
206 .device
207 .read_at(self.offset + OFFSETOF_ENTRIES, len)
208 .await,
209 );
210 let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
211 let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
212 if expected_hash.as_bytes() != recorded_hash {
213 warn!("journal is corrupt, has invalid hash, skipping recovery");
214 return;
215 };
216 if len == 0 {
217 info!("journal is empty, no recovery necessary");
218 return;
219 };
220 let mut recovered_bytes_total = 0;
221 let mut journal_offset = OFFSETOF_ENTRIES;
222 while journal_offset < len {
223 let offset = raw.read_u64_be_at(journal_offset);
224 journal_offset += 8;
225 let data_len = raw.read_u32_be_at(journal_offset);
226 journal_offset += 4;
227 let data = raw.read_at(journal_offset, data_len.into()).to_vec();
228 journal_offset += u64::from(data_len);
229 self.device.write_at(offset, data).await;
230 recovered_bytes_total += data_len;
231 }
232
233 self
235 .device
236 .write_at(self.offset, self.generate_blank_state())
237 .await;
238 info!(
239 recovered_entries = len,
240 recovered_bytes = recovered_bytes_total,
241 "journal has been recovered"
242 );
243 }
244
245 pub fn begin_transaction(&self) -> Transaction {
246 let serial_no = self
247 .next_txn_serial_no
248 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
249 Transaction::new(serial_no, self.overlay.clone())
250 }
251
252 pub async fn commit_transaction(&self, txn: Transaction) {
253 let (fut, fut_ctl) = SignalFuture::new();
254 self.sender.send((txn, fut_ctl)).unwrap();
255 fut.await;
256 }
257
258 pub async fn read_with_overlay(&self, offset: u64, len: u64) -> Vec<u8> {
260 if let Some(e) = self.overlay.get(&offset) {
261 let overlay_len = e.value().data.len();
262 assert_eq!(
263 overlay_len,
264 usz!(len),
265 "overlay data at {offset} has length {overlay_len} but requested length {len}"
266 );
267 e.value().data.clone()
268 } else {
269 self.device.read_at(offset, len).await
270 }
271 }
272
273 pub async fn clear_from_overlay(&self, offset: u64) {
276 self.overlay.remove(&offset);
277 }
278
279 async fn start_commit_background_loop(
280 &self,
281 mut rx: UnboundedReceiver<(Transaction, SignalFutureController)>,
282 ) {
283 let mut next_serial = 0;
284 let mut pending = HashMap::new();
285 while let Some((txn, fut_ctl)) = rx.recv().await {
288 pending.insert(txn.serial_no, (txn, fut_ctl));
289
290 let mut len = 0;
291 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
292 let mut writes = Vec::new();
293 let mut overlays_to_delete = Vec::new();
294 let mut fut_ctls = Vec::new();
295 while let Some((serial_no, (txn, fut_ctl))) = pending.remove_entry(&next_serial) {
297 let entry_len = txn.serialised_byte_len();
298 if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
299 let None = pending.insert(serial_no, (txn, fut_ctl)) else {
302 unreachable!();
303 };
304 break;
305 };
306 next_serial += 1;
307 for w in txn.writes {
308 let data_len: u32 = w.data.len().try_into().unwrap();
309 raw.extend_from_slice(&w.offset.to_be_bytes());
310 raw.extend_from_slice(&data_len.to_be_bytes());
311 raw.extend_from_slice(&w.data);
312 writes.push((w.offset, w.data));
313 if w.is_overlay {
314 overlays_to_delete.push((w.offset, serial_no));
315 };
316 }
317 len += entry_len;
318 fut_ctls.push(fut_ctl);
319 }
320 if fut_ctls.is_empty() {
321 assert!(overlays_to_delete.is_empty());
323 assert!(writes.is_empty());
324 continue;
325 };
326 raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
327 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
328 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
329 self.device.write_at(self.offset, raw).await;
330
331 let writes_ordered = merge_overlapping_writes(writes);
333 iter(writes_ordered)
334 .for_each_concurrent(None, async |(offset, data)| {
335 self.device.write_at(offset, data).await;
336 })
337 .await;
338
339 for fut_ctl in fut_ctls.drain(..) {
340 fut_ctl.signal(());
341 }
342
343 for (offset, serial_no) in overlays_to_delete.drain(..) {
344 self
345 .overlay
346 .remove_if(&offset, |_, e| e.serial_no <= serial_no);
347 }
348
349 self
350 .device
351 .write_at(self.offset, self.generate_blank_state())
352 .await;
353 }
354 }
355}