1use std::{
16 path::{Path, PathBuf},
17 sync::Arc,
18};
19
20use bytes::{Buf, BufMut};
21use foyer_common::{bits, metrics::Metrics};
22use futures_util::future::try_join_all;
23use tokio::sync::Mutex;
24
25use crate::{
26 device::{
27 direct_file::DirectFileDevice,
28 monitor::{Monitored, MonitoredConfig},
29 Dev, DevExt, RegionId,
30 },
31 error::{Error, Result},
32 io::{buffer::IoBuffer, PAGE},
33 DirectFileDeviceOptions, IopsCounter, Runtime,
34};
35
36#[derive(Debug, Clone)]
38pub struct TombstoneLogConfig {
39 pub path: PathBuf,
41 pub flush: bool,
43 pub iops_counter: IopsCounter,
45}
46
47#[derive(Debug)]
49pub struct TombstoneLogConfigBuilder {
50 path: PathBuf,
51 flush: bool,
52 iops_counter: IopsCounter,
53}
54
55impl TombstoneLogConfigBuilder {
56 pub fn new(path: impl AsRef<Path>) -> Self {
58 Self {
59 path: path.as_ref().into(),
60 flush: true,
61 iops_counter: IopsCounter::per_io(),
62 }
63 }
64
65 pub fn with_flush(mut self, flush: bool) -> Self {
69 self.flush = flush;
70 self
71 }
72
73 pub fn with_iops_counter(mut self, iops_counter: IopsCounter) -> Self {
75 self.iops_counter = iops_counter;
76 self
77 }
78
79 pub fn build(self) -> TombstoneLogConfig {
81 TombstoneLogConfig {
82 path: self.path,
83 flush: self.flush,
84 iops_counter: self.iops_counter,
85 }
86 }
87}
88
89#[derive(Debug, Clone)]
90pub struct Tombstone {
91 pub hash: u64,
92 pub sequence: u64,
93}
94
95impl Tombstone {
96 const fn serialized_len() -> usize {
97 8 + 8
98 }
99
100 fn write(&self, mut buf: impl BufMut) {
101 buf.put_u64(self.hash);
102 buf.put_u64(self.sequence);
103 }
104
105 fn read(mut buf: impl Buf) -> Self {
106 let hash = buf.get_u64();
107 let sequence = buf.get_u64();
108 Self { hash, sequence }
109 }
110}
111
112#[derive(Debug, Clone)]
113pub struct TombstoneLog {
114 inner: Arc<Mutex<TombstoneLogInner>>,
115}
116
117#[derive(Debug)]
118struct TombstoneLogInner {
119 offset: u64,
120 buffer: PageBuffer<Monitored<DirectFileDevice>>,
121}
122
123impl TombstoneLog {
124 const RECOVER_IO_SIZE: usize = 128 * 1024;
125
126 pub async fn open<D>(
130 config: &TombstoneLogConfig,
131 cache_device: D,
132 tombstones: &mut Vec<Tombstone>,
133 metrics: Arc<Metrics>,
134 runtime: Runtime,
135 ) -> Result<Self>
136 where
137 D: Dev,
138 {
139 let align = cache_device.align();
140
141 let capacity = bits::align_up(align, (cache_device.capacity() / align) * Tombstone::serialized_len());
147
148 let device = Monitored::open(
149 MonitoredConfig {
150 config: DirectFileDeviceOptions::new(&config.path)
151 .with_region_size(align)
152 .with_capacity(capacity)
153 .into(),
154 metrics: metrics.clone(),
155 },
156 runtime,
157 )
158 .await?;
159
160 let tasks = bits::align_up(Self::RECOVER_IO_SIZE, capacity) / Self::RECOVER_IO_SIZE;
161 tracing::trace!("[tombstone log]: recover task count: {tasks}");
162 let res = try_join_all((0..tasks).map(|i| {
163 let offset = i * Self::RECOVER_IO_SIZE;
164 let len = std::cmp::min(offset + Self::RECOVER_IO_SIZE, capacity) - offset;
165 let device = device.clone();
166 async move {
167 let buf = IoBuffer::new(len);
168 let (buffer, res) = device.pread(buf, offset as _).await;
169 res?;
170
171 let mut seq = 0;
172 let mut addr = 0;
173 let mut hash = 0;
174 let mut tombstones = vec![];
175
176 for (slot, buf) in buffer.chunks_exact(Tombstone::serialized_len()).enumerate() {
177 let tombstone = Tombstone::read(buf);
178 if tombstone.sequence > seq {
179 seq = tombstone.sequence;
180 addr = offset + slot * Tombstone::serialized_len();
181 hash = tombstone.hash
182 }
183 tombstones.push(tombstone);
184 }
185
186 Ok::<_, Error>((tombstones, seq, addr, hash))
187 }
188 }))
189 .await?;
190 let offset = res
191 .iter()
192 .reduce(|a, b| if a.1 > b.1 { a } else { b })
193 .inspect(|(_, seq, addr, hash)| {
194 tracing::trace!(
195 "[tombstone log]: found latest tombstone at {addr} with sequence = {seq}, hash = {hash}"
196 )
197 })
198 .map(|(_, _, addr, _)| *addr)
199 .unwrap() as u64;
200
201 res.into_iter().for_each(|mut r| tombstones.append(&mut r.0));
202 let offset = (offset + Tombstone::serialized_len() as u64) % capacity as u64;
203
204 let region = bits::align_down(align as RegionId, offset as RegionId) / align as RegionId;
205 let buffer = PageBuffer::open(device, region, 0, config.flush).await?;
206
207 Ok(Self {
208 inner: Arc::new(Mutex::new(TombstoneLogInner { offset, buffer })),
209 })
210 }
211
212 pub async fn append(&self, tombstones: impl Iterator<Item = &Tombstone>) -> Result<()> {
213 let mut inner = self.inner.lock().await;
214
215 let align = inner.buffer.device.align();
216
217 for tombstone in tombstones {
218 if bits::is_aligned(align as u64, inner.offset) {
219 inner.buffer.flush().await?;
220 let region = bits::align_down(align as RegionId, inner.offset as RegionId) / align as RegionId;
221 inner.buffer.load(region, 0).await?;
222 }
223 let start = inner.offset as usize % align;
224 let end = start + Tombstone::serialized_len();
225 tombstone.write(&mut inner.buffer.as_mut()[start..end]);
226
227 inner.offset = (inner.offset + Tombstone::serialized_len() as u64) % inner.buffer.device.capacity() as u64;
228 }
229
230 inner.buffer.flush().await
231 }
232}
233
234#[derive(Debug)]
235pub struct PageBuffer<D> {
236 region: RegionId,
237 idx: u32,
238 buffer: Option<IoBuffer>,
240
241 device: D,
242
243 sync: bool,
244}
245
246impl<D> AsRef<[u8]> for PageBuffer<D> {
247 fn as_ref(&self) -> &[u8] {
248 self.buffer.as_ref().unwrap()
249 }
250}
251
252impl<D> AsMut<[u8]> for PageBuffer<D> {
253 fn as_mut(&mut self) -> &mut [u8] {
254 self.buffer.as_mut().unwrap()
255 }
256}
257
258impl<D> PageBuffer<D>
259where
260 D: Dev,
261{
262 pub async fn open(device: D, region: RegionId, idx: u32, sync: bool) -> Result<Self> {
263 let mut this = Self {
264 region,
265 idx,
266 buffer: Some(IoBuffer::new(PAGE)),
267 device,
268 sync,
269 };
270
271 this.update().await?;
272
273 Ok(this)
274 }
275
276 pub async fn update(&mut self) -> Result<()> {
277 let buf = self.buffer.take().unwrap();
278 let (buf, res) = self
279 .device
280 .read(buf, self.region, Self::offset(self.device.align(), self.idx))
281 .await;
282 self.buffer = Some(buf);
283 res?;
284 Ok(())
285 }
286
287 pub async fn load(&mut self, region: RegionId, idx: u32) -> Result<()> {
288 self.region = region;
289 self.idx = idx;
290 self.update().await
291 }
292
293 pub async fn flush(&mut self) -> Result<()> {
294 let buf = self.buffer.take().unwrap();
295 let (buf, res) = self
296 .device
297 .write(buf, self.region, Self::offset(self.device.align(), self.idx))
298 .await;
299 self.buffer = Some(buf);
300 res?;
301 if self.sync {
302 self.device.sync(Some(self.region)).await?;
303 }
304 Ok(())
305 }
306
307 fn offset(align: usize, idx: u32) -> u64 {
308 align as u64 * idx as u64
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use itertools::Itertools;
315 use tempfile::tempdir;
316
317 use super::*;
318 use crate::device::direct_fs::{DirectFsDevice, DirectFsDeviceOptions};
319
320 #[test_log::test(tokio::test)]
321 async fn test_tombstone_log() {
322 let runtime = Runtime::current();
323
324 let dir = tempdir().unwrap();
325
326 let device = DirectFsDevice::open(
328 DirectFsDeviceOptions::new(dir.path())
329 .with_capacity(4 * 1024 * 1024)
330 .into(),
331 runtime.clone(),
332 )
333 .await
334 .unwrap();
335
336 let log = TombstoneLog::open(
337 &TombstoneLogConfig {
338 path: dir.path().join("test-tombstone-log"),
339 flush: true,
340 iops_counter: IopsCounter::per_io(),
341 },
342 device.clone(),
343 &mut vec![],
344 Arc::new(Metrics::noop()),
345 runtime.clone(),
346 )
347 .await
348 .unwrap();
349
350 log.append(
351 (0..3 * 1024 + 42)
352 .map(|i| Tombstone { hash: i, sequence: i })
353 .collect_vec()
354 .iter(),
355 )
356 .await
357 .unwrap();
358
359 {
360 let inner = log.inner.lock().await;
361 assert_eq!(
362 inner.offset,
363 (3 * 1024 + 42 + 1) * Tombstone::serialized_len() as u64 % (16 * 1024)
364 )
365 }
366
367 drop(log);
368
369 let log = TombstoneLog::open(
370 &TombstoneLogConfig {
371 path: dir.path().join("test-tombstone-log"),
372 flush: true,
373 iops_counter: IopsCounter::per_io(),
374 },
375 device.clone(),
376 &mut vec![],
377 Arc::new(Metrics::noop()),
378 runtime.clone(),
379 )
380 .await
381 .unwrap();
382
383 {
384 let inner = log.inner.lock().await;
385 assert_eq!(
386 inner.offset,
387 (3 * 1024 + 42 + 1) * Tombstone::serialized_len() as u64 % (16 * 1024)
388 )
389 }
390 }
391}