1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use seekable_async_file::SeekableAsyncFile;
8use seekable_async_file::WriteRequest;
9use signal_future::SignalFuture;
10use signal_future::SignalFutureController;
11use std::iter::once;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15use tokio::time::sleep;
16use tracing::info;
17use tracing::warn;
18
19const OFFSETOF_HASH: u64 = 0;
20const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
21const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
22
23pub struct TransactionWrite {
25 pub offset: u64,
26 pub data: Vec<u8>,
27 pub is_overlay: bool,
28}
29
30pub struct Transaction {
31 serial_no: u64,
32 writes: Vec<TransactionWrite>,
33 overlay: Arc<DashMap<u64, OverlayEntry>>,
34}
35
36impl Transaction {
37 fn serialised_byte_len(&self) -> u64 {
38 u64::try_from(
39 self
40 .writes
41 .iter()
42 .map(|w| 8 + 4 + w.data.len())
43 .sum::<usize>(),
44 )
45 .unwrap()
46 }
47
48 pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
50 Self {
51 serial_no,
52 writes: Vec::new(),
53 overlay,
54 }
55 }
56
57 pub fn serial_no(&self) -> u64 {
58 self.serial_no
59 }
60
61 pub fn into_writes(self) -> Vec<TransactionWrite> {
62 self.writes
63 }
64
65 pub fn write(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
67 self.writes.push(TransactionWrite {
68 offset,
69 data,
70 is_overlay: false,
71 });
72 self
73 }
74
75 pub fn write_with_overlay(&mut self, offset: u64, data: Vec<u8>) -> &mut Self {
78 self.overlay.insert(offset, OverlayEntry {
79 data: data.clone(),
80 serial_no: self.serial_no,
81 });
82 self.writes.push(TransactionWrite {
83 offset,
84 data,
85 is_overlay: true,
86 });
87 self
88 }
89}
90
91pub struct OverlayEntry {
95 pub data: Vec<u8>,
96 pub serial_no: u64,
97}
98
99pub struct WriteJournal {
100 device: SeekableAsyncFile,
101 offset: u64,
102 capacity: u64,
103 pending: DashMap<u64, (Transaction, SignalFutureController)>,
104 next_txn_serial_no: AtomicU64,
105 commit_delay: Duration,
106 overlay: Arc<DashMap<u64, OverlayEntry>>,
107}
108
109impl WriteJournal {
110 pub fn new(
111 device: SeekableAsyncFile,
112 offset: u64,
113 capacity: u64,
114 commit_delay: Duration,
115 ) -> Self {
116 assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
117 Self {
118 device,
119 offset,
120 capacity,
121 pending: Default::default(),
122 next_txn_serial_no: AtomicU64::new(0),
123 commit_delay,
124 overlay: Default::default(),
125 }
126 }
127
128 pub fn generate_blank_state(&self) -> Vec<u8> {
129 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
130 raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
131 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
132 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
133 raw
134 }
135
136 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
137 pub async fn format_device(&self) {
138 self
139 .device
140 .write_at(self.offset, self.generate_blank_state())
141 .await;
142 }
143
144 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
145 pub async fn recover(&self) {
146 let mut raw = self
147 .device
148 .read_at(self.offset, OFFSETOF_ENTRIES)
149 .await
150 .to_vec();
151 let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
152 if len > self.capacity - OFFSETOF_ENTRIES {
153 warn!("journal is corrupt, has invalid length, skipping recovery");
154 return;
155 };
156 raw.extend_from_slice(
157 &mut self
158 .device
159 .read_at(self.offset + OFFSETOF_ENTRIES, len)
160 .await,
161 );
162 let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
163 let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
164 if expected_hash.as_bytes() != recorded_hash {
165 warn!("journal is corrupt, has invalid hash, skipping recovery");
166 return;
167 };
168 if len == 0 {
169 info!("journal is empty, no recovery necessary");
170 return;
171 };
172 let mut recovered_bytes_total = 0;
173 let mut journal_offset = OFFSETOF_ENTRIES;
174 while journal_offset < len {
175 let offset = raw.read_u64_be_at(journal_offset);
176 journal_offset += 8;
177 let data_len = raw.read_u32_be_at(journal_offset);
178 journal_offset += 4;
179 let data = raw.read_at(journal_offset, data_len.into()).to_vec();
180 journal_offset += u64::from(data_len);
181 self.device.write_at(offset, data).await;
182 recovered_bytes_total += data_len;
183 }
184 self.device.sync_data().await;
185
186 self
188 .device
189 .write_at(self.offset, self.generate_blank_state())
190 .await;
191 self.device.sync_data().await;
192 info!(
193 recovered_entries = len,
194 recovered_bytes = recovered_bytes_total,
195 "journal has been recovered"
196 );
197 }
198
199 pub fn begin_transaction(&self) -> Transaction {
200 let serial_no = self
201 .next_txn_serial_no
202 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
203 Transaction::new(serial_no, self.overlay.clone())
204 }
205
206 pub async fn commit_transaction(&self, txn: Transaction) {
207 let (fut, fut_ctl) = SignalFuture::new();
208 let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
209 unreachable!();
210 };
211 fut.await;
212 }
213
214 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
216 pub async fn read_with_overlay(&self, offset: u64, len: u64) -> Vec<u8> {
217 if let Some(e) = self.overlay.get(&offset) {
218 let overlay_len = e.value().data.len();
219 assert_eq!(
220 overlay_len,
221 usz!(len),
222 "overlay data at {offset} has length {overlay_len} but requested length {len}"
223 );
224 e.value().data.clone()
225 } else {
226 self.device.read_at(offset, len).await
227 }
228 }
229
230 pub async fn clear_from_overlay(&self, offset: u64) {
233 self.overlay.remove(&offset);
234 }
235
236 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
237 pub async fn start_commit_background_loop(&self) {
238 let mut next_serial = 0;
239
240 let mut writes = Vec::new();
242 let mut fut_ctls = Vec::new();
243 let mut overlays_to_delete = Vec::new();
244 loop {
245 sleep(self.commit_delay).await;
246
247 let mut len = 0;
248 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
249 while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
251 let entry_len = txn.serialised_byte_len();
252 if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
253 let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
256 unreachable!();
257 };
258 break;
259 };
260 next_serial += 1;
261 for w in txn.writes {
262 let data_len: u32 = w.data.len().try_into().unwrap();
263 raw.extend_from_slice(&w.offset.to_be_bytes());
264 raw.extend_from_slice(&data_len.to_be_bytes());
265 raw.extend_from_slice(&w.data);
266 writes.push(WriteRequest::new(w.offset, w.data));
267 if w.is_overlay {
268 overlays_to_delete.push((w.offset, serial_no));
269 };
270 }
271 len += entry_len;
272 fut_ctls.push(fut_ctl);
273 }
274 if fut_ctls.is_empty() {
275 assert!(overlays_to_delete.is_empty());
277 assert!(writes.is_empty());
278 continue;
279 };
280 raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
281 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
282 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
283 self
284 .device
285 .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
286 .await;
287
288 self
289 .device
290 .write_at_with_delayed_sync(writes.drain(..))
291 .await;
292
293 for fut_ctl in fut_ctls.drain(..) {
294 fut_ctl.signal(());
295 }
296
297 for (offset, serial_no) in overlays_to_delete.drain(..) {
298 self
299 .overlay
300 .remove_if(&offset, |_, e| e.serial_no <= serial_no);
301 }
302
303 self
305 .device
306 .write_at(self.offset, self.generate_blank_state())
307 .await;
308 self.device.sync_data().await;
309 }
310 }
311}