1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use rustc_hash::FxHasher;
8use seekable_async_file::SeekableAsyncFile;
9use seekable_async_file::WriteRequest;
10use signal_future::SignalFuture;
11use signal_future::SignalFutureController;
12use std::hash::BuildHasherDefault;
13use std::iter::once;
14use std::sync::atomic::AtomicU64;
15use std::sync::Arc;
16use std::time::Duration;
17use tinybuf::TinyBuf;
18use tokio::time::sleep;
19use tracing::info;
20use tracing::warn;
21
22const OFFSETOF_HASH: u64 = 0;
23const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
24const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
25
26struct TransactionWrite {
27 offset: u64,
28 data: TinyBuf,
29 is_overlay: bool,
30}
31
32pub struct Transaction {
33 serial_no: u64,
34 writes: Vec<TransactionWrite>,
35 overlay: Arc<DashMap<u64, OverlayEntry, BuildHasherDefault<FxHasher>>>,
36}
37
38impl Transaction {
39 fn serialised_byte_len(&self) -> u64 {
40 u64::try_from(
41 self
42 .writes
43 .iter()
44 .map(|w| 8 + 4 + w.data.len())
45 .sum::<usize>(),
46 )
47 .unwrap()
48 }
49
50 pub fn write<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
52 let data = data.into();
53 self.writes.push(TransactionWrite {
54 offset,
55 data,
56 is_overlay: false,
57 });
58 self
59 }
60
61 pub fn write_with_overlay<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
64 let data = data.into();
65 self.overlay.insert(offset, OverlayEntry {
66 data: data.clone(),
67 serial_no: self.serial_no,
68 });
69 self.writes.push(TransactionWrite {
70 offset,
71 data,
72 is_overlay: true,
73 });
74 self
75 }
76}
77
78struct OverlayEntry {
81 data: TinyBuf,
82 serial_no: u64,
83}
84
85pub struct WriteJournal {
86 device: SeekableAsyncFile,
87 offset: u64,
88 capacity: u64,
89 pending: DashMap<u64, (Transaction, SignalFutureController), BuildHasherDefault<FxHasher>>,
90 next_txn_serial_no: AtomicU64,
91 commit_delay: Duration,
92 overlay: Arc<DashMap<u64, OverlayEntry, BuildHasherDefault<FxHasher>>>,
93}
94
95impl WriteJournal {
96 pub fn new(
97 device: SeekableAsyncFile,
98 offset: u64,
99 capacity: u64,
100 commit_delay: Duration,
101 ) -> Self {
102 assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
103 Self {
104 device,
105 offset,
106 capacity,
107 pending: Default::default(),
108 next_txn_serial_no: AtomicU64::new(0),
109 commit_delay,
110 overlay: Default::default(),
111 }
112 }
113
114 pub fn generate_blank_state(&self) -> Vec<u8> {
115 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
116 raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
117 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
118 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
119 raw
120 }
121
122 pub async fn format_device(&self) {
123 self
124 .device
125 .write_at(self.offset, self.generate_blank_state())
126 .await;
127 }
128
129 pub async fn recover(&self) {
130 let mut raw = self
131 .device
132 .read_at(self.offset, OFFSETOF_ENTRIES)
133 .await
134 .to_vec();
135 let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
136 if len > self.capacity - OFFSETOF_ENTRIES {
137 warn!("journal is corrupt, has invalid length, skipping recovery");
138 return;
139 };
140 raw.extend_from_slice(
141 &mut self
142 .device
143 .read_at(self.offset + OFFSETOF_ENTRIES, len)
144 .await,
145 );
146 let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
147 let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
148 if expected_hash.as_bytes() != recorded_hash {
149 warn!("journal is corrupt, has invalid hash, skipping recovery");
150 return;
151 };
152 if len == 0 {
153 info!("journal is empty, no recovery necessary");
154 return;
155 };
156 let mut recovered_bytes_total = 0;
157 let mut journal_offset = OFFSETOF_ENTRIES;
158 while journal_offset < len {
159 let offset = raw.read_u64_be_at(journal_offset);
160 journal_offset += 8;
161 let data_len = raw.read_u32_be_at(journal_offset);
162 journal_offset += 4;
163 let data = TinyBuf::from_slice(raw.read_at(journal_offset, data_len.into()));
164 journal_offset += u64::from(data_len);
165 self.device.write_at(offset, data).await;
166 recovered_bytes_total += data_len;
167 }
168 self.device.sync_data().await;
169
170 self
172 .device
173 .write_at(self.offset, self.generate_blank_state())
174 .await;
175 self.device.sync_data().await;
176 info!(
177 recovered_entries = len,
178 recovered_bytes = recovered_bytes_total,
179 "journal has been recovered"
180 );
181 }
182
183 pub fn begin_transaction(&self) -> Transaction {
184 let serial_no = self
185 .next_txn_serial_no
186 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
187 Transaction {
188 serial_no,
189 writes: Vec::new(),
190 overlay: self.overlay.clone(),
191 }
192 }
193
194 pub async fn commit_transaction(&self, txn: Transaction) {
195 let (fut, fut_ctl) = SignalFuture::new();
196 let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
197 unreachable!();
198 };
199 fut.await;
200 }
201
202 pub async fn read_with_overlay(&self, offset: u64, len: u64) -> TinyBuf {
204 if let Some(e) = self.overlay.get(&offset) {
205 let overlay_len = e.value().data.len();
206 assert_eq!(
207 overlay_len,
208 usz!(len),
209 "overlay data at {offset} has length {overlay_len} but requested length {len}"
210 );
211 e.value().data.clone()
212 } else {
213 self.device.read_at(offset, len).await.into()
214 }
215 }
216
217 pub async fn clear_from_overlay(&self, offset: u64) {
220 self.overlay.remove(&offset);
221 }
222
223 pub async fn start_commit_background_loop(&self) {
224 let mut next_serial = 0;
225
226 let mut writes = Vec::new();
228 let mut fut_ctls = Vec::new();
229 let mut overlays_to_delete = Vec::new();
230 loop {
231 sleep(self.commit_delay).await;
232
233 let mut len = 0;
234 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
235 while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
237 let entry_len = txn.serialised_byte_len();
238 if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
239 let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
241 unreachable!();
242 };
243 break;
244 };
245 next_serial += 1;
246 for w in txn.writes {
247 let data_len: u32 = w.data.len().try_into().unwrap();
248 raw.extend_from_slice(&w.offset.to_be_bytes());
249 raw.extend_from_slice(&data_len.to_be_bytes());
250 raw.extend_from_slice(&w.data);
251 writes.push(WriteRequest::new(w.offset, w.data));
252 if w.is_overlay {
253 overlays_to_delete.push((w.offset, serial_no));
254 };
255 }
256 len += entry_len;
257 fut_ctls.push(fut_ctl);
258 }
259 if fut_ctls.is_empty() {
260 assert!(overlays_to_delete.is_empty());
262 assert!(writes.is_empty());
263 continue;
264 };
265 raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
266 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
267 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
268 self
269 .device
270 .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
271 .await;
272
273 self
274 .device
275 .write_at_with_delayed_sync(writes.drain(..))
276 .await;
277
278 for fut_ctl in fut_ctls.drain(..) {
279 fut_ctl.signal(());
280 }
281
282 for (offset, serial_no) in overlays_to_delete.drain(..) {
283 self
284 .overlay
285 .remove_if(&offset, |_, e| e.serial_no <= serial_no);
286 }
287
288 self
290 .device
291 .write_at(self.offset, self.generate_blank_state())
292 .await;
293 self.device.sync_data().await;
294 }
295 }
296}