1use dashmap::DashMap;
2use off64::int::Off64ReadInt;
3use off64::int::Off64WriteMutInt;
4use off64::usz;
5use off64::Off64Read;
6use off64::Off64WriteMut;
7use seekable_async_file::SeekableAsyncFile;
8use seekable_async_file::WriteRequest;
9use signal_future::SignalFuture;
10use signal_future::SignalFutureController;
11use std::iter::once;
12use std::sync::atomic::AtomicU64;
13use std::sync::Arc;
14use std::time::Duration;
15use tinybuf::TinyBuf;
16use tokio::time::sleep;
17use tracing::info;
18use tracing::warn;
19
20const OFFSETOF_HASH: u64 = 0;
21const OFFSETOF_LEN: u64 = OFFSETOF_HASH + 32;
22const OFFSETOF_ENTRIES: u64 = OFFSETOF_LEN + 4;
23
24struct TransactionWrite {
25 offset: u64,
26 data: TinyBuf,
27 is_overlay: bool,
28}
29
30pub struct Transaction {
31 serial_no: u64,
32 writes: Vec<TransactionWrite>,
33 overlay: Arc<DashMap<u64, OverlayEntry>>,
34}
35
36impl Transaction {
37 fn serialised_byte_len(&self) -> u64 {
38 u64::try_from(
39 self
40 .writes
41 .iter()
42 .map(|w| 8 + 4 + w.data.len())
43 .sum::<usize>(),
44 )
45 .unwrap()
46 }
47
48 pub fn write<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
50 let data = data.into();
51 self.writes.push(TransactionWrite {
52 offset,
53 data,
54 is_overlay: false,
55 });
56 self
57 }
58
59 pub fn write_with_overlay<D: Into<TinyBuf>>(&mut self, offset: u64, data: D) -> &mut Self {
62 let data = data.into();
63 self.overlay.insert(offset, OverlayEntry {
64 data: data.clone(),
65 serial_no: self.serial_no,
66 });
67 self.writes.push(TransactionWrite {
68 offset,
69 data,
70 is_overlay: true,
71 });
72 self
73 }
74}
75
76struct OverlayEntry {
79 data: TinyBuf,
80 serial_no: u64,
81}
82
83pub struct WriteJournal {
84 device: SeekableAsyncFile,
85 offset: u64,
86 capacity: u64,
87 pending: DashMap<u64, (Transaction, SignalFutureController)>,
88 next_txn_serial_no: AtomicU64,
89 commit_delay: Duration,
90 overlay: Arc<DashMap<u64, OverlayEntry>>,
91}
92
93impl WriteJournal {
94 pub fn new(
95 device: SeekableAsyncFile,
96 offset: u64,
97 capacity: u64,
98 commit_delay: Duration,
99 ) -> Self {
100 assert!(capacity > OFFSETOF_ENTRIES && capacity <= u32::MAX.into());
101 Self {
102 device,
103 offset,
104 capacity,
105 pending: Default::default(),
106 next_txn_serial_no: AtomicU64::new(0),
107 commit_delay,
108 overlay: Default::default(),
109 }
110 }
111
112 pub fn generate_blank_state(&self) -> Vec<u8> {
113 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
114 raw.write_u32_be_at(OFFSETOF_LEN, 0u32);
115 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
116 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
117 raw
118 }
119
120 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
121 pub async fn format_device(&self) {
122 self
123 .device
124 .write_at(self.offset, self.generate_blank_state())
125 .await;
126 }
127
128 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
129 pub async fn recover(&self) {
130 let mut raw = self
131 .device
132 .read_at(self.offset, OFFSETOF_ENTRIES)
133 .await
134 .to_vec();
135 let len: u64 = raw.read_u32_be_at(OFFSETOF_LEN).into();
136 if len > self.capacity - OFFSETOF_ENTRIES {
137 warn!("journal is corrupt, has invalid length, skipping recovery");
138 return;
139 };
140 raw.extend_from_slice(
141 &mut self
142 .device
143 .read_at(self.offset + OFFSETOF_ENTRIES, len)
144 .await,
145 );
146 let expected_hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
147 let recorded_hash = &raw[..usz!(OFFSETOF_LEN)];
148 if expected_hash.as_bytes() != recorded_hash {
149 warn!("journal is corrupt, has invalid hash, skipping recovery");
150 return;
151 };
152 if len == 0 {
153 info!("journal is empty, no recovery necessary");
154 return;
155 };
156 let mut recovered_bytes_total = 0;
157 let mut journal_offset = OFFSETOF_ENTRIES;
158 while journal_offset < len {
159 let offset = raw.read_u64_be_at(journal_offset);
160 journal_offset += 8;
161 let data_len = raw.read_u32_be_at(journal_offset);
162 journal_offset += 4;
163 let data = TinyBuf::from_slice(raw.read_at(journal_offset, data_len.into()));
164 journal_offset += u64::from(data_len);
165 self.device.write_at(offset, data).await;
166 recovered_bytes_total += data_len;
167 }
168 self.device.sync_data().await;
169
170 self
172 .device
173 .write_at(self.offset, self.generate_blank_state())
174 .await;
175 self.device.sync_data().await;
176 info!(
177 recovered_entries = len,
178 recovered_bytes = recovered_bytes_total,
179 "journal has been recovered"
180 );
181 }
182
183 pub fn begin_transaction(&self) -> Transaction {
184 let serial_no = self
185 .next_txn_serial_no
186 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
187 Transaction {
188 serial_no,
189 writes: Vec::new(),
190 overlay: self.overlay.clone(),
191 }
192 }
193
194 pub async fn commit_transaction(&self, txn: Transaction) {
195 let (fut, fut_ctl) = SignalFuture::new();
196 let None = self.pending.insert(txn.serial_no, (txn, fut_ctl)) else {
197 unreachable!();
198 };
199 fut.await;
200 }
201
202 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
204 pub async fn read_with_overlay(&self, offset: u64, len: u64) -> TinyBuf {
205 if let Some(e) = self.overlay.get(&offset) {
206 let overlay_len = e.value().data.len();
207 assert_eq!(
208 overlay_len,
209 usz!(len),
210 "overlay data at {offset} has length {overlay_len} but requested length {len}"
211 );
212 e.value().data.clone()
213 } else {
214 self.device.read_at(offset, len).await.into()
215 }
216 }
217
218 pub async fn clear_from_overlay(&self, offset: u64) {
221 self.overlay.remove(&offset);
222 }
223
224 #[cfg(any(feature = "io_mmap", feature = "io_file"))]
225 pub async fn start_commit_background_loop(&self) {
226 let mut next_serial = 0;
227
228 let mut writes = Vec::new();
230 let mut fut_ctls = Vec::new();
231 let mut overlays_to_delete = Vec::new();
232 loop {
233 sleep(self.commit_delay).await;
234
235 let mut len = 0;
236 let mut raw = vec![0u8; usz!(OFFSETOF_ENTRIES)];
237 while let Some((serial_no, (txn, fut_ctl))) = self.pending.remove(&next_serial) {
239 let entry_len = txn.serialised_byte_len();
240 if len + entry_len > self.capacity - OFFSETOF_ENTRIES {
241 let None = self.pending.insert(serial_no, (txn, fut_ctl)) else {
244 unreachable!();
245 };
246 break;
247 };
248 next_serial += 1;
249 for w in txn.writes {
250 let data_len: u32 = w.data.len().try_into().unwrap();
251 raw.extend_from_slice(&w.offset.to_be_bytes());
252 raw.extend_from_slice(&data_len.to_be_bytes());
253 raw.extend_from_slice(&w.data);
254 writes.push(WriteRequest::new(w.offset, w.data));
255 if w.is_overlay {
256 overlays_to_delete.push((w.offset, serial_no));
257 };
258 }
259 len += entry_len;
260 fut_ctls.push(fut_ctl);
261 }
262 if fut_ctls.is_empty() {
263 assert!(overlays_to_delete.is_empty());
265 assert!(writes.is_empty());
266 continue;
267 };
268 raw.write_u32_be_at(OFFSETOF_LEN, u32::try_from(len).unwrap());
269 let hash = blake3::hash(&raw[usz!(OFFSETOF_LEN)..]);
270 raw.write_at(OFFSETOF_HASH, hash.as_bytes());
271 self
272 .device
273 .write_at_with_delayed_sync(once(WriteRequest::new(self.offset, raw)))
274 .await;
275
276 self
277 .device
278 .write_at_with_delayed_sync(writes.drain(..))
279 .await;
280
281 for fut_ctl in fut_ctls.drain(..) {
282 fut_ctl.signal(());
283 }
284
285 for (offset, serial_no) in overlays_to_delete.drain(..) {
286 self
287 .overlay
288 .remove_if(&offset, |_, e| e.serial_no <= serial_no);
289 }
290
291 self
293 .device
294 .write_at(self.offset, self.generate_blank_state())
295 .await;
296 self.device.sync_data().await;
297 }
298 }
299}