1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use seekable_async_file::SeekableAsyncFile;
8use seekable_async_file::WriteRequest;
9use signal_future::SignalFuture;
10use signal_future::SignalFutureController;
11use std::iter::once;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15use tinybuf::TinyBuf;
16use tokio::time::sleep;
17use tracing::info;
18use tracing::warn;
19
20const OFFSETOF_HASH: u64 = 0;
21const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
22const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
23
24pub struct TransactionWrite {
26 pub offset: u64,
27 pub data: TinyBuf,
28 pub is_overlay: bool,
29}
30
31pub struct Transaction {
32 serial_no: u64,
33 writes: Vec<TransactionWrite>,
34 overlay: Arc<DashMap<u64, OverlayEntry>>,
35}
36
37impl Transaction {
38 fn serialised_byte_len(&self) -> u64 {
39 u64::try_from(
40 self
41 .writes
42 .iter()
43 .map(|w| 8 + 4 + w.data.len())
44 .sum::<usize>(),
45 )
46 .unwrap()
47 }
48
49 pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
51 Self {
52 serial_no,
53 writes: Vec::new(),
54 overlay,
55 }
56 }
57
58 pub fn serial_no(&self) -> u64 {
59 self.serial_no
60 }
61
62 pub fn into_writes(self) -> Vec<TransactionWrite> {
63 self.writes
64 }
65
66 pub fn write<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
68 let data = data.into();
69 self.writes.push(TransactionWrite {
70 offset,
71 data,
72 is_overlay: false,
73 });
74 self
75 }
76
77 pub fn write_with_overlay<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
80 let data = data.into();
81 self.overlay.insert(offset, OverlayEntry {
82 data: data.clone(),
83 serial_no: self.serial_no,
84 });
85 self.writes.push(TransactionWrite {
86 offset,
87 data,
88 is_overlay: true,
89 });
90 self
91 }
92}
93
94pub struct OverlayEntry {
97 data: TinyBuf,
98 serial_no: u64,
99}
100
101pub struct WriteJournal {
102 device: SeekableAsyncFile,
103 offset: u64,
104 capacity: u64,
105 pending: DashMap<u64, (Transaction, SignalFutureController)>,
106 next_txn_serial_no: AtomicU64,
107 commit_delay: Duration,
108 overlay: Arc<DashMap<u64, OverlayEntry>>,
109}
110
111impl WriteJournal {
112 pub fn new(
113 device: SeekableAsyncFile,
114 offset: u64,
115 capacity: u64,
116 commit_delay: Duration,
117 ) -> Self {
118 assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
119 Self {
120 device,
121 offset,
122 capacity,
123 pending: Default::default(),
124 next_txn_serial_no: AtomicU64::new(0),
125 commit_delay,
126 overlay: Default::default(),
127 }
128 }
129
130 pub fn generate_blank_state(&self) -> Vec<u8> {
131 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
132 raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
133 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
134 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
135 raw
136 }
137
138 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
139 pub async fn format_device(&self) {
140 self
141 .device
142 .write_at(self.offset, self.generate_blank_state())
143 .await;
144 }
145
146 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
147 pub async fn recover(&self) {
148 let mut raw = self
149 .device
150 .read_at(self.offset, OFFSETOF_ENTRIES)
151 .await
152 .to_vec();
153 let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
154 if len > self.capacity - OFFSETOF_ENTRIES {
155 warn!("journal is corrupt, has invalid length, skipping recovery");
156 return;
157 };
158 raw.extend_from_slice(
159 &mut self
160 .device
161 .read_at(self.offset + OFFSETOF_ENTRIES, len)
162 .await,
163 );
164 let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
165 let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
166 if expected_hash.as_bytes() != recorded_hash {
167 warn!("journal is corrupt, has invalid hash, skipping recovery");
168 return;
169 };
170 if len == 0 {
171 info!("journal is empty, no recovery necessary");
172 return;
173 };
174 let mut recovered_bytes_total = 0;
175 let mut journal_offset = OFFSETOF_ENTRIES;
176 while journal_offset < len {
177 let offset = raw.read_u64_be_at(journal_offset);
178 journal_offset += 8;
179 let data_len = raw.read_u32_be_at(journal_offset);
180 journal_offset += 4;
181 let data = TinyBuf::from_slice(raw.read_at(journal_offset, data_len.into()));
182 journal_offset += u64::from(data_len);
183 self.device.write_at(offset, data).await;
184 recovered_bytes_total += data_len;
185 }
186 self.device.sync_data().await;
187
188 self
190 .device
191 .write_at(self.offset, self.generate_blank_state())
192 .await;
193 self.device.sync_data().await;
194 info!(
195 recovered_entries = len,
196 recovered_bytes = recovered_bytes_total,
197 "journal has been recovered"
198 );
199 }
200
201 pub fn begin_transaction(&self) -> Transaction {
202 let serial_no = self
203 .next_txn_serial_no
204 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
205 Transaction::new(serial_no, self.overlay.clone())
206 }
207
208 pub async fn commit_transaction(&self, txn: Transaction) {
209 let (fut, fut_ctl) = SignalFuture::new();
210 let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
211 unreachable!();
212 };
213 fut.await;
214 }
215
216 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
218 pub async fn read_with_overlay(&self, offset: u64, len: u64) -> TinyBuf {
219 if let Some(e) = self.overlay.get(&offset) {
220 let overlay_len = e.value().data.len();
221 assert_eq!(
222 overlay_len,
223 usz!(len),
224 "overlay data at {offset} has length {overlay_len} but requested length {len}"
225 );
226 e.value().data.clone()
227 } else {
228 self.device.read_at(offset, len).await.into()
229 }
230 }
231
232 pub async fn clear_from_overlay(&self, offset: u64) {
235 self.overlay.remove(&offset);
236 }
237
238 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
239 pub async fn start_commit_background_loop(&self) {
240 let mut next_serial = 0;
241
242 let mut writes = Vec::new();
244 let mut fut_ctls = Vec::new();
245 let mut overlays_to_delete = Vec::new();
246 loop {
247 sleep(self.commit_delay).await;
248
249 let mut len = 0;
250 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
251 while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
253 let entry_len = txn.serialised_byte_len();
254 if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
255 let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
258 unreachable!();
259 };
260 break;
261 };
262 next_serial += 1;
263 for w in txn.writes {
264 let data_len: u32 = w.data.len().try_into().unwrap();
265 raw.extend_from_slice(&w.offset.to_be_bytes());
266 raw.extend_from_slice(&data_len.to_be_bytes());
267 raw.extend_from_slice(&w.data);
268 writes.push(WriteRequest::new(w.offset, w.data));
269 if w.is_overlay {
270 overlays_to_delete.push((w.offset, serial_no));
271 };
272 }
273 len += entry_len;
274 fut_ctls.push(fut_ctl);
275 }
276 if fut_ctls.is_empty() {
277 assert!(overlays_to_delete.is_empty());
279 assert!(writes.is_empty());
280 continue;
281 };
282 raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
283 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
284 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
285 self
286 .device
287 .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
288 .await;
289
290 self
291 .device
292 .write_at_with_delayed_sync(writes.drain(..))
293 .await;
294
295 for fut_ctl in fut_ctls.drain(..) {
296 fut_ctl.signal(());
297 }
298
299 for (offset, serial_no) in overlays_to_delete.drain(..) {
300 self
301 .overlay
302 .remove_if(&offset, |_, e| e.serial_no <= serial_no);
303 }
304
305 self
307 .device
308 .write_at(self.offset, self.generate_blank_state())
309 .await;
310 self.device.sync_data().await;
311 }
312 }
313}