foyer_storage/large/
tombstone.rs

1// Copyright 2025 foyer Project Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{
16    path::{Path, PathBuf},
17    sync::Arc,
18};
19
20use bytes::{Buf, BufMut};
21use foyer_common::{bits, metrics::Metrics};
22use futures_util::future::try_join_all;
23use tokio::sync::Mutex;
24
25use crate::{
26    device::{
27        direct_file::DirectFileDevice,
28        monitor::{Monitored, MonitoredConfig},
29        Dev, DevExt, RegionId,
30    },
31    error::{Error, Result},
32    io::{buffer::IoBuffer, PAGE},
33    DirectFileDeviceOptions, IopsCounter, Runtime,
34};
35
36/// The configurations for the tombstone log.
37#[derive(Debug, Clone)]
38pub struct TombstoneLogConfig {
39    /// Path of the tombstone log.
40    pub path: PathBuf,
41    /// If enabled, `sync` will be called after writes to make sure the data is safely persisted on the device.
42    pub flush: bool,
43    /// iops_counter for the tombstone log device monitor.
44    pub iops_counter: IopsCounter,
45}
46
47/// The builder for the tombstone log config.
48#[derive(Debug)]
49pub struct TombstoneLogConfigBuilder {
50    path: PathBuf,
51    flush: bool,
52    iops_counter: IopsCounter,
53}
54
55impl TombstoneLogConfigBuilder {
56    /// Create a new builder for the tombstone log config on the given path.
57    pub fn new(path: impl AsRef<Path>) -> Self {
58        Self {
59            path: path.as_ref().into(),
60            flush: true,
61            iops_counter: IopsCounter::per_io(),
62        }
63    }
64
65    /// Set whether to enable flush.
66    ///
67    /// If enabled, `sync` will be called after writes to make sure the data is safely persisted on the device.
68    pub fn with_flush(mut self, flush: bool) -> Self {
69        self.flush = flush;
70        self
71    }
72
73    /// Set iops counter for the tombstone log device monitor.
74    pub fn with_iops_counter(mut self, iops_counter: IopsCounter) -> Self {
75        self.iops_counter = iops_counter;
76        self
77    }
78
79    /// Build the tombstone log config with the given args.
80    pub fn build(self) -> TombstoneLogConfig {
81        TombstoneLogConfig {
82            path: self.path,
83            flush: self.flush,
84            iops_counter: self.iops_counter,
85        }
86    }
87}
88
89#[derive(Debug, Clone)]
90pub struct Tombstone {
91    pub hash: u64,
92    pub sequence: u64,
93}
94
95impl Tombstone {
96    const fn serialized_len() -> usize {
97        8 + 8
98    }
99
100    fn write(&self, mut buf: impl BufMut) {
101        buf.put_u64(self.hash);
102        buf.put_u64(self.sequence);
103    }
104
105    fn read(mut buf: impl Buf) -> Self {
106        let hash = buf.get_u64();
107        let sequence = buf.get_u64();
108        Self { hash, sequence }
109    }
110}
111
112#[derive(Debug, Clone)]
113pub struct TombstoneLog {
114    inner: Arc<Mutex<TombstoneLogInner>>,
115}
116
117#[derive(Debug)]
118struct TombstoneLogInner {
119    offset: u64,
120    buffer: PageBuffer<Monitored<DirectFileDevice>>,
121}
122
123impl TombstoneLog {
124    const RECOVER_IO_SIZE: usize = 128 * 1024;
125
126    /// Open the tombstone log with given a dedicated device.
127    ///
128    /// The tombstone log will
129    pub async fn open<D>(
130        config: &TombstoneLogConfig,
131        cache_device: D,
132        tombstones: &mut Vec<Tombstone>,
133        metrics: Arc<Metrics>,
134        runtime: Runtime,
135    ) -> Result<Self>
136    where
137        D: Dev,
138    {
139        let align = cache_device.align();
140
141        // For large entry disk cache, the minimum entry size is the alignment.
142        //
143        // So, the tombstone log needs at most `cache device capacity / align` slots.
144        //
145        // For the alignment is 4K and the slot size is 16B, tombstone log requires 1/256 of the cache device size.
146        let capacity = bits::align_up(align, (cache_device.capacity() / align) * Tombstone::serialized_len());
147
148        let device = Monitored::open(
149            MonitoredConfig {
150                config: DirectFileDeviceOptions::new(&config.path)
151                    .with_region_size(align)
152                    .with_capacity(capacity)
153                    .into(),
154                metrics: metrics.clone(),
155            },
156            runtime,
157        )
158        .await?;
159
160        let tasks = bits::align_up(Self::RECOVER_IO_SIZE, capacity) / Self::RECOVER_IO_SIZE;
161        tracing::trace!("[tombstone log]: recover task count: {tasks}");
162        let res = try_join_all((0..tasks).map(|i| {
163            let offset = i * Self::RECOVER_IO_SIZE;
164            let len = std::cmp::min(offset + Self::RECOVER_IO_SIZE, capacity) - offset;
165            let device = device.clone();
166            async move {
167                let buf = IoBuffer::new(len);
168                let (buffer, res) = device.pread(buf, offset as _).await;
169                res?;
170
171                let mut seq = 0;
172                let mut addr = 0;
173                let mut hash = 0;
174                let mut tombstones = vec![];
175
176                for (slot, buf) in buffer.chunks_exact(Tombstone::serialized_len()).enumerate() {
177                    let tombstone = Tombstone::read(buf);
178                    if tombstone.sequence > seq {
179                        seq = tombstone.sequence;
180                        addr = offset + slot * Tombstone::serialized_len();
181                        hash = tombstone.hash
182                    }
183                    tombstones.push(tombstone);
184                }
185
186                Ok::<_, Error>((tombstones, seq, addr, hash))
187            }
188        }))
189        .await?;
190        let offset = res
191            .iter()
192            .reduce(|a, b| if a.1 > b.1 { a } else { b })
193            .inspect(|(_, seq, addr, hash)| {
194                tracing::trace!(
195                    "[tombstone log]: found latest tombstone at {addr} with sequence = {seq}, hash = {hash}"
196                )
197            })
198            .map(|(_, _, addr, _)| *addr)
199            .unwrap() as u64;
200
201        res.into_iter().for_each(|mut r| tombstones.append(&mut r.0));
202        let offset = (offset + Tombstone::serialized_len() as u64) % capacity as u64;
203
204        let region = bits::align_down(align as RegionId, offset as RegionId) / align as RegionId;
205        let buffer = PageBuffer::open(device, region, 0, config.flush).await?;
206
207        Ok(Self {
208            inner: Arc::new(Mutex::new(TombstoneLogInner { offset, buffer })),
209        })
210    }
211
212    pub async fn append(&self, tombstones: impl Iterator<Item = &Tombstone>) -> Result<()> {
213        let mut inner = self.inner.lock().await;
214
215        let align = inner.buffer.device.align();
216
217        for tombstone in tombstones {
218            if bits::is_aligned(align as u64, inner.offset) {
219                inner.buffer.flush().await?;
220                let region = bits::align_down(align as RegionId, inner.offset as RegionId) / align as RegionId;
221                inner.buffer.load(region, 0).await?;
222            }
223            let start = inner.offset as usize % align;
224            let end = start + Tombstone::serialized_len();
225            tombstone.write(&mut inner.buffer.as_mut()[start..end]);
226
227            inner.offset = (inner.offset + Tombstone::serialized_len() as u64) % inner.buffer.device.capacity() as u64;
228        }
229
230        inner.buffer.flush().await
231    }
232}
233
234#[derive(Debug)]
235pub struct PageBuffer<D> {
236    region: RegionId,
237    idx: u32,
238    // NOTE: This is always `Some(..)`.
239    buffer: Option<IoBuffer>,
240
241    device: D,
242
243    sync: bool,
244}
245
246impl<D> AsRef<[u8]> for PageBuffer<D> {
247    fn as_ref(&self) -> &[u8] {
248        self.buffer.as_ref().unwrap()
249    }
250}
251
252impl<D> AsMut<[u8]> for PageBuffer<D> {
253    fn as_mut(&mut self) -> &mut [u8] {
254        self.buffer.as_mut().unwrap()
255    }
256}
257
258impl<D> PageBuffer<D>
259where
260    D: Dev,
261{
262    pub async fn open(device: D, region: RegionId, idx: u32, sync: bool) -> Result<Self> {
263        let mut this = Self {
264            region,
265            idx,
266            buffer: Some(IoBuffer::new(PAGE)),
267            device,
268            sync,
269        };
270
271        this.update().await?;
272
273        Ok(this)
274    }
275
276    pub async fn update(&mut self) -> Result<()> {
277        let buf = self.buffer.take().unwrap();
278        let (buf, res) = self
279            .device
280            .read(buf, self.region, Self::offset(self.device.align(), self.idx))
281            .await;
282        self.buffer = Some(buf);
283        res?;
284        Ok(())
285    }
286
287    pub async fn load(&mut self, region: RegionId, idx: u32) -> Result<()> {
288        self.region = region;
289        self.idx = idx;
290        self.update().await
291    }
292
293    pub async fn flush(&mut self) -> Result<()> {
294        let buf = self.buffer.take().unwrap();
295        let (buf, res) = self
296            .device
297            .write(buf, self.region, Self::offset(self.device.align(), self.idx))
298            .await;
299        self.buffer = Some(buf);
300        res?;
301        if self.sync {
302            self.device.sync(Some(self.region)).await?;
303        }
304        Ok(())
305    }
306
307    fn offset(align: usize, idx: u32) -> u64 {
308        align as u64 * idx as u64
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use itertools::Itertools;
315    use tempfile::tempdir;
316
317    use super::*;
318    use crate::device::direct_fs::{DirectFsDevice, DirectFsDeviceOptions};
319
320    #[test_log::test(tokio::test)]
321    async fn test_tombstone_log() {
322        let runtime = Runtime::current();
323
324        let dir = tempdir().unwrap();
325
326        // 4 MB cache device => 16 KB tombstone log => 1K tombstones
327        let device = DirectFsDevice::open(
328            DirectFsDeviceOptions::new(dir.path())
329                .with_capacity(4 * 1024 * 1024)
330                .into(),
331            runtime.clone(),
332        )
333        .await
334        .unwrap();
335
336        let log = TombstoneLog::open(
337            &TombstoneLogConfig {
338                path: dir.path().join("test-tombstone-log"),
339                flush: true,
340                iops_counter: IopsCounter::per_io(),
341            },
342            device.clone(),
343            &mut vec![],
344            Arc::new(Metrics::noop()),
345            runtime.clone(),
346        )
347        .await
348        .unwrap();
349
350        log.append(
351            (0..3 * 1024 + 42)
352                .map(|i| Tombstone { hash: i, sequence: i })
353                .collect_vec()
354                .iter(),
355        )
356        .await
357        .unwrap();
358
359        {
360            let inner = log.inner.lock().await;
361            assert_eq!(
362                inner.offset,
363                (3 * 1024 + 42 + 1) * Tombstone::serialized_len() as u64 % (16 * 1024)
364            )
365        }
366
367        drop(log);
368
369        let log = TombstoneLog::open(
370            &TombstoneLogConfig {
371                path: dir.path().join("test-tombstone-log"),
372                flush: true,
373                iops_counter: IopsCounter::per_io(),
374            },
375            device.clone(),
376            &mut vec![],
377            Arc::new(Metrics::noop()),
378            runtime.clone(),
379        )
380        .await
381        .unwrap();
382
383        {
384            let inner = log.inner.lock().await;
385            assert_eq!(
386                inner.offset,
387                (3 * 1024 + 42 + 1) * Tombstone::serialized_len() as u64 % (16 * 1024)
388            )
389        }
390    }
391}