1pub mod merge;
2
3use crate::merge::merge_overlapping_writes;
4use dashmap::DashMap;
5use futures::stream::iter;
6use futures::StreamExt;
7use off64::int::Off64ReadInt;
8use off64::int::Off64WriteMutInt;
9use off64::usz;
10use off64::Off64Read;
11use off64::Off64WriteMut;
12use signal_future::SignalFuture;
13use signal_future::SignalFutureController;
14use std::collections::HashMap;
15use std::os::unix::fs::FileExt;
16use std::path::Path;
17use std::sync::atomic::AtomicU64;
18use std::sync::Arc;
19use tokio::fs::OpenOptions;
20use tokio::sync::mpsc::unbounded_channel;
21use tokio::sync::mpsc::UnboundedReceiver;
22use tokio::sync::mpsc::UnboundedSender;
23use tokio::task::spawn_blocking;
24use tracing::info;
25use tracing::warn;
26
27const OFFSETOF_HASH: u64 = 0;
28const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
29const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
30
31pub struct TransactionWrite {
33 pub offset: u64,
34 pub data: Vec<u8>,
35 pub is_overlay: bool,
36}
37
38pub struct Transaction {
39 serial_no: u64,
40 writes: Vec<TransactionWrite>,
41 overlay: Arc<DashMap<u64, OverlayEntry>>,
42}
43
44impl Transaction {
45 fn serialised_byte_len(&self) -> u64 {
46 u64::try_from(
47 self
48 .writes
49 .iter()
50 .map(|w| 8 + 4 + w.data.len())
51 .sum::<usize>(),
52 )
53 .unwrap()
54 }
55
56 pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
58 Self {
59 serial_no,
60 writes: Vec::new(),
61 overlay,
62 }
63 }
64
65 pub fn serial_no(&self) -> u64 {
66 self.serial_no
67 }
68
69 pub fn into_writes(self) -> Vec<TransactionWrite> {
70 self.writes
71 }
72
73 pub fn write(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
75 self.writes.push(TransactionWrite {
76 offset,
77 data,
78 is_overlay: false,
79 });
80 self
81 }
82
83 pub fn write_with_overlay(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
86 self.overlay.insert(offset, OverlayEntry {
87 data: data.clone(),
88 serial_no: self.serial_no,
89 });
90 self.writes.push(TransactionWrite {
91 offset,
92 data,
93 is_overlay: true,
94 });
95 self
96 }
97}
98
99pub struct OverlayEntry {
103 pub data: Vec<u8>,
104 pub serial_no: u64,
105}
106
107struct DsyncFile(Arc<std::fs::File>);
108
109impl DsyncFile {
110 pub async fn open(path: &Path) -> Self {
111 Self(
112 OpenOptions::new()
113 .read(true)
114 .write(true)
115 .custom_flags(libc::O_DSYNC)
116 .open(path)
117 .await
118 .unwrap()
119 .into_std()
120 .await
121 .into(),
122 )
123 }
124
125 pub async fn read_at(&self, offset: u64, len: u64) -> Vec<u8> {
126 let file = self.0.clone();
127 spawn_blocking(move || {
128 let mut buf = vec![0u8; usz!(len)];
129 file.read_exact_at(&mut buf, offset).unwrap();
130 buf
131 })
132 .await
133 .unwrap()
134 }
135
136 pub async fn write_at(&self, offset: u64, data: Vec<u8>) {
137 let file = self.0.clone();
138 spawn_blocking(move || {
139 file.write_all_at(&data, offset).unwrap();
140 })
141 .await
142 .unwrap()
143 }
144}
145
146pub struct WriteJournal {
147 device: DsyncFile,
148 offset: u64,
149 capacity: u64,
150 sender: UnboundedSender<(Transaction, SignalFutureController)>,
151 next_txn_serial_no: AtomicU64,
152 overlay: Arc<DashMap<u64, OverlayEntry>>,
153}
154
155impl WriteJournal {
156 pub async fn open(path: &Path, offset: u64, capacity: u64) -> Arc<Self> {
157 assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
158 let (tx, rx) = unbounded_channel();
159 let journal = Arc::new(Self {
160 device: DsyncFile::open(path).await,
162 offset,
163 capacity,
164 sender: tx,
165 next_txn_serial_no: AtomicU64::new(0),
166 overlay: Default::default(),
167 });
168 tokio::task::spawn({
169 let journal = journal.clone();
170 async move { journal.start_commit_background_loop(rx).await }
171 });
172 journal
173 }
174
175 pub fn generate_blank_state(&self) -> Vec<u8> {
176 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
177 raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
178 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
179 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
180 raw
181 }
182
183 pub async fn format_device(&self) {
184 self
185 .device
186 .write_at(self.offset, self.generate_blank_state())
187 .await;
188 }
189
190 pub async fn recover(&self) {
191 let mut raw = self
192 .device
193 .read_at(self.offset, OFFSETOF_ENTRIES)
194 .await
195 .to_vec();
196 let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
197 if len > self.capacity - OFFSETOF_ENTRIES {
198 warn!("journal is corrupt, has invalid length, skipping recovery");
199 return;
200 };
201 raw.extend_from_slice(
202 &mut self
203 .device
204 .read_at(self.offset + OFFSETOF_ENTRIES, len)
205 .await,
206 );
207 let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
208 let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
209 if expected_hash.as_bytes() != recorded_hash {
210 warn!("journal is corrupt, has invalid hash, skipping recovery");
211 return;
212 };
213 if len == 0 {
214 info!("journal is empty, no recovery necessary");
215 return;
216 };
217 let mut recovered_bytes_total = 0;
218 let mut journal_offset = OFFSETOF_ENTRIES;
219 while journal_offset < len {
220 let offset = raw.read_u64_be_at(journal_offset);
221 journal_offset += 8;
222 let data_len = raw.read_u32_be_at(journal_offset);
223 journal_offset += 4;
224 let data = raw.read_at(journal_offset, data_len.into()).to_vec();
225 journal_offset += u64::from(data_len);
226 self.device.write_at(offset, data).await;
227 recovered_bytes_total += data_len;
228 }
229
230 self
232 .device
233 .write_at(self.offset, self.generate_blank_state())
234 .await;
235 info!(
236 recovered_entries = len,
237 recovered_bytes = recovered_bytes_total,
238 "journal has been recovered"
239 );
240 }
241
242 pub fn begin_transaction(&self) -> Transaction {
243 let serial_no = self
244 .next_txn_serial_no
245 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
246 Transaction::new(serial_no, self.overlay.clone())
247 }
248
249 pub async fn commit_transaction(&self, txn: Transaction) {
250 let (fut, fut_ctl) = SignalFuture::new();
251 self.sender.send((txn, fut_ctl)).unwrap();
252 fut.await;
253 }
254
255 pub async fn read_with_overlay(&self, offset: u64, len: u64) -> Vec<u8> {
257 if let Some(e) = self.overlay.get(&offset) {
258 let overlay_len = e.value().data.len();
259 assert_eq!(
260 overlay_len,
261 usz!(len),
262 "overlay data at {offset} has length {overlay_len} but requested length {len}"
263 );
264 e.value().data.clone()
265 } else {
266 self.device.read_at(offset, len).await
267 }
268 }
269
270 pub async fn clear_from_overlay(&self, offset: u64) {
273 self.overlay.remove(&offset);
274 }
275
276 async fn start_commit_background_loop(
277 &self,
278 mut rx: UnboundedReceiver<(Transaction, SignalFutureController)>,
279 ) {
280 let mut next_serial = 0;
281 let mut pending = HashMap::new();
282 while let Some((txn, fut_ctl)) = rx.recv().await {
285 pending.insert(txn.serial_no, (txn, fut_ctl));
286
287 let mut len = 0;
288 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
289 let mut writes = Vec::new();
290 let mut overlays_to_delete = Vec::new();
291 let mut fut_ctls = Vec::new();
292 while let Some((serial_no, (txn, fut_ctl))) = pending.remove_entry(&next_serial) {
294 let entry_len = txn.serialised_byte_len();
295 if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
296 let None = pending.insert(serial_no, (txn, fut_ctl)) else {
299 unreachable!();
300 };
301 break;
302 };
303 next_serial += 1;
304 for w in txn.writes {
305 let data_len: u32 = w.data.len().try_into().unwrap();
306 raw.extend_from_slice(&w.offset.to_be_bytes());
307 raw.extend_from_slice(&data_len.to_be_bytes());
308 raw.extend_from_slice(&w.data);
309 writes.push((w.offset, w.data));
310 if w.is_overlay {
311 overlays_to_delete.push((w.offset, serial_no));
312 };
313 }
314 len += entry_len;
315 fut_ctls.push(fut_ctl);
316 }
317 if fut_ctls.is_empty() {
318 assert!(overlays_to_delete.is_empty());
320 assert!(writes.is_empty());
321 continue;
322 };
323 raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
324 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
325 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
326 self.device.write_at(self.offset, raw).await;
327
328 let writes_ordered = merge_overlapping_writes(writes);
330 iter(writes_ordered)
331 .for_each_concurrent(None, async |(_, (offset, data))| {
332 self.device.write_at(offset, data).await;
333 })
334 .await;
335
336 for fut_ctl in fut_ctls.drain(..) {
337 fut_ctl.signal(());
338 }
339
340 for (offset, serial_no) in overlays_to_delete.drain(..) {
341 self
342 .overlay
343 .remove_if(&offset, |_, e| e.serial_no <= serial_no);
344 }
345
346 self
347 .device
348 .write_at(self.offset, self.generate_blank_state())
349 .await;
350 }
351 }
352}