1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use seekable_async_file::SeekableAsyncFile;
8use seekable_async_file::WriteRequest;
9use signal_future::SignalFuture;
10use signal_future::SignalFutureController;
11use std::iter::once;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15use tinybuf::TinyBuf;
16use tokio::time::sleep;
17use tracing::info;
18use tracing::warn;
19
20const OFFSETOF_HASH: u64 = 0;
21const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
22const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
23
24pub struct TransactionWrite {
26 pub offset: u64,
27 pub data: TinyBuf,
28 pub is_overlay: bool,
29}
30
31pub struct Transaction {
32 serial_no: u64,
33 writes: Vec<TransactionWrite>,
34 overlay: Arc<DashMap<u64, OverlayEntry>>,
35}
36
37impl Transaction {
38 fn serialised_byte_len(&self) -> u64 {
39 u64::try_from(
40 self
41 .writes
42 .iter()
43 .map(|w| 8 + 4 + w.data.len())
44 .sum::<usize>(),
45 )
46 .unwrap()
47 }
48
49 pub fn new(serial_no: u64, overlay: Arc<DashMap<u64, OverlayEntry>>) -> Self {
51 Self {
52 serial_no,
53 writes: Vec::new(),
54 overlay,
55 }
56 }
57
58 pub fn serial_no(&self) -> u64 {
59 self.serial_no
60 }
61
62 pub fn into_writes(self) -> Vec<TransactionWrite> {
63 self.writes
64 }
65
66 pub fn write<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
68 let data = data.into();
69 self.writes.push(TransactionWrite {
70 offset,
71 data,
72 is_overlay: false,
73 });
74 self
75 }
76
77 pub fn write_with_overlay<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
80 let data = data.into();
81 self.overlay.insert(offset, OverlayEntry {
82 data: data.clone(),
83 serial_no: self.serial_no,
84 });
85 self.writes.push(TransactionWrite {
86 offset,
87 data,
88 is_overlay: true,
89 });
90 self
91 }
92}
93
94pub struct OverlayEntry {
98 pub data: TinyBuf,
99 pub serial_no: u64,
100}
101
102pub struct WriteJournal {
103 device: SeekableAsyncFile,
104 offset: u64,
105 capacity: u64,
106 pending: DashMap<u64, (Transaction, SignalFutureController)>,
107 next_txn_serial_no: AtomicU64,
108 commit_delay: Duration,
109 overlay: Arc<DashMap<u64, OverlayEntry>>,
110}
111
112impl WriteJournal {
113 pub fn new(
114 device: SeekableAsyncFile,
115 offset: u64,
116 capacity: u64,
117 commit_delay: Duration,
118 ) -> Self {
119 assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
120 Self {
121 device,
122 offset,
123 capacity,
124 pending: Default::default(),
125 next_txn_serial_no: AtomicU64::new(0),
126 commit_delay,
127 overlay: Default::default(),
128 }
129 }
130
131 pub fn generate_blank_state(&self) -> Vec<u8> {
132 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
133 raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
134 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
135 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
136 raw
137 }
138
139 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
140 pub async fn format_device(&self) {
141 self
142 .device
143 .write_at(self.offset, self.generate_blank_state())
144 .await;
145 }
146
147 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
148 pub async fn recover(&self) {
149 let mut raw = self
150 .device
151 .read_at(self.offset, OFFSETOF_ENTRIES)
152 .await
153 .to_vec();
154 let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
155 if len > self.capacity - OFFSETOF_ENTRIES {
156 warn!("journal is corrupt, has invalid length, skipping recovery");
157 return;
158 };
159 raw.extend_from_slice(
160 &mut self
161 .device
162 .read_at(self.offset + OFFSETOF_ENTRIES, len)
163 .await,
164 );
165 let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
166 let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
167 if expected_hash.as_bytes() != recorded_hash {
168 warn!("journal is corrupt, has invalid hash, skipping recovery");
169 return;
170 };
171 if len == 0 {
172 info!("journal is empty, no recovery necessary");
173 return;
174 };
175 let mut recovered_bytes_total = 0;
176 let mut journal_offset = OFFSETOF_ENTRIES;
177 while journal_offset < len {
178 let offset = raw.read_u64_be_at(journal_offset);
179 journal_offset += 8;
180 let data_len = raw.read_u32_be_at(journal_offset);
181 journal_offset += 4;
182 let data = TinyBuf::from_slice(raw.read_at(journal_offset, data_len.into()));
183 journal_offset += u64::from(data_len);
184 self.device.write_at(offset, data).await;
185 recovered_bytes_total += data_len;
186 }
187 self.device.sync_data().await;
188
189 self
191 .device
192 .write_at(self.offset, self.generate_blank_state())
193 .await;
194 self.device.sync_data().await;
195 info!(
196 recovered_entries = len,
197 recovered_bytes = recovered_bytes_total,
198 "journal has been recovered"
199 );
200 }
201
202 pub fn begin_transaction(&self) -> Transaction {
203 let serial_no = self
204 .next_txn_serial_no
205 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
206 Transaction::new(serial_no, self.overlay.clone())
207 }
208
209 pub async fn commit_transaction(&self, txn: Transaction) {
210 let (fut, fut_ctl) = SignalFuture::new();
211 let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
212 unreachable!();
213 };
214 fut.await;
215 }
216
217 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
219 pub async fn read_with_overlay(&self, offset: u64, len: u64) -> TinyBuf {
220 if let Some(e) = self.overlay.get(&offset) {
221 let overlay_len = e.value().data.len();
222 assert_eq!(
223 overlay_len,
224 usz!(len),
225 "overlay data at {offset} has length {overlay_len} but requested length {len}"
226 );
227 e.value().data.clone()
228 } else {
229 self.device.read_at(offset, len).await.into()
230 }
231 }
232
233 pub async fn clear_from_overlay(&self, offset: u64) {
236 self.overlay.remove(&offset);
237 }
238
239 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
240 pub async fn start_commit_background_loop(&self) {
241 let mut next_serial = 0;
242
243 let mut writes = Vec::new();
245 let mut fut_ctls = Vec::new();
246 let mut overlays_to_delete = Vec::new();
247 loop {
248 sleep(self.commit_delay).await;
249
250 let mut len = 0;
251 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
252 while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
254 let entry_len = txn.serialised_byte_len();
255 if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
256 let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
259 unreachable!();
260 };
261 break;
262 };
263 next_serial += 1;
264 for w in txn.writes {
265 let data_len: u32 = w.data.len().try_into().unwrap();
266 raw.extend_from_slice(&w.offset.to_be_bytes());
267 raw.extend_from_slice(&data_len.to_be_bytes());
268 raw.extend_from_slice(&w.data);
269 writes.push(WriteRequest::new(w.offset, w.data));
270 if w.is_overlay {
271 overlays_to_delete.push((w.offset, serial_no));
272 };
273 }
274 len += entry_len;
275 fut_ctls.push(fut_ctl);
276 }
277 if fut_ctls.is_empty() {
278 assert!(overlays_to_delete.is_empty());
280 assert!(writes.is_empty());
281 continue;
282 };
283 raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
284 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
285 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
286 self
287 .device
288 .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
289 .await;
290
291 self
292 .device
293 .write_at_with_delayed_sync(writes.drain(..))
294 .await;
295
296 for fut_ctl in fut_ctls.drain(..) {
297 fut_ctl.signal(());
298 }
299
300 for (offset, serial_no) in overlays_to_delete.drain(..) {
301 self
302 .overlay
303 .remove_if(&offset, |_, e| e.serial_no <= serial_no);
304 }
305
306 self
308 .device
309 .write_at(self.offset, self.generate_blank_state())
310 .await;
311 self.device.sync_data().await;
312 }
313 }
314}