1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use seekable_async_file::SeekableAsyncFile;
8use seekable_async_file::WriteRequest;
9use signal_future::SignalFuture;
10use signal_future::SignalFutureController;
11use std::iter::once;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::time::sleep;
16use tracing::info;
17use tracing::warn;
18
19const OFFSETOF_HASH: u64 = 0;
20const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
21const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
22
23pub struct TransactionWrite {
25 pub offset: u64,
26 pub data: Vec<u8>,
27 pub is_overlay: bool,
28}
29
30pub struct Transaction {
31 serial_no: u64,
32 writes: Vec<TransactionWrite>,
33 overlay: Arc<DashMap<u64, OverlayEntry>>,
34}
35
36impl Transaction {
37 fn serialised_byte_len(&self) -> u64 {
38 u64::try_from(
39 self
40 .writes
41 .iter()
42 .map(|w| 8 + 4 + w.data.len())
43 .sum::<usize>(),
44 )
45 .unwrap()
46 }
47
48 pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
50 Self {
51 serial_no,
52 writes: Vec::new(),
53 overlay,
54 }
55 }
56
57 pub fn serial_no(&self) -> u64 {
58 self.serial_no
59 }
60
61 pub fn into_writes(self) -> Vec<TransactionWrite> {
62 self.writes
63 }
64
65 pub fn write(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
67 self.writes.push(TransactionWrite {
68 offset,
69 data,
70 is_overlay: false,
71 });
72 self
73 }
74
75 pub fn write_with_overlay(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
78 self.overlay.insert(offset, OverlayEntry {
79 data: data.clone(),
80 serial_no: self.serial_no,
81 });
82 self.writes.push(TransactionWrite {
83 offset,
84 data,
85 is_overlay: true,
86 });
87 self
88 }
89}
90
91pub struct OverlayEntry {
95 pub data: Vec<u8>,
96 pub serial_no: u64,
97}
98
99pub struct WriteJournal {
100 device: SeekableAsyncFile,
101 offset: u64,
102 capacity: u64,
103 pending: DashMap<u64, (Transaction, SignalFutureController)>,
104 next_txn_serial_no: AtomicU64,
105 commit_delay: Duration,
106 overlay: Arc<DashMap<u64, OverlayEntry>>,
107}
108
109impl WriteJournal {
110 pub fn new(
111 device: SeekableAsyncFile,
112 offset: u64,
113 capacity: u64,
114 commit_delay: Duration,
115 ) -> Self {
116 assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
117 Self {
118 device,
119 offset,
120 capacity,
121 pending: Default::default(),
122 next_txn_serial_no: AtomicU64::new(0),
123 commit_delay,
124 overlay: Default::default(),
125 }
126 }
127
128 pub fn generate_blank_state(&self) -> Vec<u8> {
129 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
130 raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
131 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
132 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
133 raw
134 }
135
136 pub async fn format_device(&self) {
137 self
138 .device
139 .write_at(self.offset, self.generate_blank_state())
140 .await;
141 }
142
143 pub async fn recover(&self) {
144 let mut raw = self
145 .device
146 .read_at(self.offset, OFFSETOF_ENTRIES)
147 .await
148 .to_vec();
149 let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
150 if len > self.capacity - OFFSETOF_ENTRIES {
151 warn!("journal is corrupt, has invalid length, skipping recovery");
152 return;
153 };
154 raw.extend_from_slice(
155 &mut self
156 .device
157 .read_at(self.offset + OFFSETOF_ENTRIES, len)
158 .await,
159 );
160 let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
161 let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
162 if expected_hash.as_bytes() != recorded_hash {
163 warn!("journal is corrupt, has invalid hash, skipping recovery");
164 return;
165 };
166 if len == 0 {
167 info!("journal is empty, no recovery necessary");
168 return;
169 };
170 let mut recovered_bytes_total = 0;
171 let mut journal_offset = OFFSETOF_ENTRIES;
172 while journal_offset < len {
173 let offset = raw.read_u64_be_at(journal_offset);
174 journal_offset += 8;
175 let data_len = raw.read_u32_be_at(journal_offset);
176 journal_offset += 4;
177 let data = raw.read_at(journal_offset, data_len.into()).to_vec();
178 journal_offset += u64::from(data_len);
179 self.device.write_at(offset, data).await;
180 recovered_bytes_total += data_len;
181 }
182 self.device.sync_data().await;
183
184 self
186 .device
187 .write_at(self.offset, self.generate_blank_state())
188 .await;
189 self.device.sync_data().await;
190 info!(
191 recovered_entries = len,
192 recovered_bytes = recovered_bytes_total,
193 "journal has been recovered"
194 );
195 }
196
197 pub fn begin_transaction(&self) -> Transaction {
198 let serial_no = self
199 .next_txn_serial_no
200 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
201 Transaction::new(serial_no, self.overlay.clone())
202 }
203
204 pub async fn commit_transaction(&self, txn: Transaction) {
205 let (fut, fut_ctl) = SignalFuture::new();
206 let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
207 unreachable!();
208 };
209 fut.await;
210 }
211
212 pub async fn read_with_overlay(&self, offset: u64, len: u64) -> Vec<u8> {
214 if let Some(e) = self.overlay.get(&offset) {
215 let overlay_len = e.value().data.len();
216 assert_eq!(
217 overlay_len,
218 usz!(len),
219 "overlay data at {offset} has length {overlay_len} but requested length {len}"
220 );
221 e.value().data.clone()
222 } else {
223 self.device.read_at(offset, len).await
224 }
225 }
226
227 pub async fn clear_from_overlay(&self, offset: u64) {
230 self.overlay.remove(&offset);
231 }
232
233 pub async fn start_commit_background_loop(&self) {
234 let mut next_serial = 0;
235
236 let mut writes = Vec::new();
238 let mut fut_ctls = Vec::new();
239 let mut overlays_to_delete = Vec::new();
240 loop {
241 sleep(self.commit_delay).await;
242
243 let mut len = 0;
244 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
245 while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
247 let entry_len = txn.serialised_byte_len();
248 if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
249 let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
252 unreachable!();
253 };
254 break;
255 };
256 next_serial += 1;
257 for w in txn.writes {
258 let data_len: u32 = w.data.len().try_into().unwrap();
259 raw.extend_from_slice(&w.offset.to_be_bytes());
260 raw.extend_from_slice(&data_len.to_be_bytes());
261 raw.extend_from_slice(&w.data);
262 writes.push(WriteRequest::new(w.offset, w.data));
263 if w.is_overlay {
264 overlays_to_delete.push((w.offset, serial_no));
265 };
266 }
267 len += entry_len;
268 fut_ctls.push(fut_ctl);
269 }
270 if fut_ctls.is_empty() {
271 assert!(overlays_to_delete.is_empty());
273 assert!(writes.is_empty());
274 continue;
275 };
276 raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
277 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
278 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
279 self
280 .device
281 .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
282 .await;
283
284 self
285 .device
286 .write_at_with_delayed_sync(writes.drain(..))
287 .await;
288
289 for fut_ctl in fut_ctls.drain(..) {
290 fut_ctl.signal(());
291 }
292
293 for (offset, serial_no) in overlays_to_delete.drain(..) {
294 self
295 .overlay
296 .remove_if(&offset, |_, e| e.serial_no <= serial_no);
297 }
298
299 self
301 .device
302 .write_at(self.offset, self.generate_blank_state())
303 .await;
304 self.device.sync_data().await;
305 }
306 }
307}