indexedlog/log/
fold.rs

1/*
2 * Copyright (c) Meta Platforms, Inc. and affiliates.
3 *
4 * This source code is licensed under the MIT license found in the
5 * LICENSE file in the root directory of this source tree.
6 */
7
8use std::any::Any;
9use std::fmt::Debug;
10use std::fs;
11use std::io;
12use std::path::Path;
13
14use vlqencoding::VLQDecode;
15use vlqencoding::VLQEncode;
16
17use crate::errors::IoResultExt;
18use crate::log::Log;
19use crate::utils::atomic_write_plain;
20use crate::utils::xxhash;
21use crate::Error;
22use crate::Result;
23
24/// Definition of a "fold" function.
25///
26/// The "fold" function is similar to the "fold" function in stdlib.
27/// It takes an initial state, processes new entries, then output
28/// the new state.
29///
30/// Given an append-only `Log`, the state of a "fold" function can
31/// be saved to disk and loaded later. A `Log` maintains the on-disk
32/// state so "fold" calculation won't always start from scratch.
33#[derive(Clone, Debug)]
34pub struct FoldDef {
35    /// Function to create an empty fold state.
36    pub(crate) create_fold: fn() -> Box<dyn Fold>,
37
38    /// Name of the fold state.
39    ///
40    /// The name will be used as part of the fold file name. Therefore do not
41    /// use user-generated content here. And do not abuse this by using `..` or `/`.
42    ///
43    /// When adding new or changing fold functions, use a different
44    /// `name` to avoid reusing existing data incorrectly.
45    pub(crate) name: &'static str,
46}
47
48/// The actual logic of a "fold" function, and its associated state.
49pub trait Fold: Debug + 'static + Send + Sync {
50    /// Load the initial fold state.
51    /// This will be called if the state exists.
52    fn load(&mut self, state_bytes: &[u8]) -> io::Result<()>;
53
54    /// Dump the fold state as bytes.
55    fn dump(&self) -> io::Result<Vec<u8>>;
56
57    /// Process a log entry. Update self state in place.
58    fn accumulate(&mut self, entry: &[u8]) -> Result<()>;
59
60    /// Downcast. Useful to access internal state without serialization cost.
61    fn as_any(&self) -> &dyn Any;
62
63    /// Clone the state.
64    fn clone_boxed(&self) -> Box<dyn Fold>;
65}
66
67/// State tracking the progress of a fold function.
68#[derive(Debug)]
69pub(crate) struct FoldState {
70    /// Epoch. Useful to detect non-append-only changes.
71    /// See also `LogMetadata`.
72    pub(crate) epoch: u64,
73
74    /// Offset of the next entry.
75    pub(crate) offset: u64,
76
77    /// The state of the actual fold.
78    pub(crate) fold: Box<dyn Fold>,
79
80    /// How to reset the `fold` state.
81    def: FoldDef,
82}
83
84impl FoldDef {
85    /// Create a "fold" definition.
86    ///
87    /// `create_func` is a function to produce an empty "fold" state.
88    pub fn new(name: &'static str, create_fold: fn() -> Box<dyn Fold>) -> Self {
89        Self { create_fold, name }
90    }
91
92    pub(crate) fn empty_state(&self) -> FoldState {
93        FoldState {
94            epoch: 0,
95            offset: 0,
96            fold: (self.create_fold)(),
97            def: self.clone(),
98        }
99    }
100}
101
102impl Clone for FoldState {
103    fn clone(&self) -> Self {
104        Self {
105            epoch: self.epoch,
106            offset: self.offset,
107            fold: self.fold.clone_boxed(),
108            def: self.def.clone(),
109        }
110    }
111}
112
113impl FoldState {
114    pub(crate) fn load_from_file(&mut self, path: &Path) -> crate::Result<()> {
115        (|| -> io::Result<()> {
116            let data = fs::read(path)?;
117            let checksum = match data.get(0..8) {
118                Some(h) => u64::from_be_bytes(<[u8; 8]>::try_from(h).unwrap()),
119                None => {
120                    return Err(io::Error::new(
121                        io::ErrorKind::InvalidData,
122                        format!("corrupted FoldState (no checksum): {:?}", data),
123                    ));
124                }
125            };
126            if xxhash(&data[8..]) != checksum {
127                return Err(io::Error::new(
128                    io::ErrorKind::InvalidData,
129                    format!("corrupted FoldState (wrong checksum): {:?}", data),
130                ));
131            }
132            let mut reader = &data[8..];
133            let epoch = reader.read_vlq()?;
134            let offset = reader.read_vlq()?;
135            self.fold.load(reader)?;
136            self.epoch = epoch;
137            self.offset = offset;
138            Ok(())
139        })()
140        .context(path, "cannot read FoldState")
141    }
142
143    pub(crate) fn save_to_file(&self, path: &Path) -> crate::Result<()> {
144        let data = (|| -> io::Result<Vec<u8>> {
145            let mut body = Vec::new();
146            body.write_vlq(self.epoch)?;
147            body.write_vlq(self.offset)?;
148            body.extend_from_slice(&self.fold.dump()?);
149            let checksum = xxhash(&body);
150            let mut data: Vec<u8> = checksum.to_be_bytes().to_vec();
151            data.extend_from_slice(&body);
152            Ok(data)
153        })()
154        .context(path, "cannot prepare FoldState")?;
155        atomic_write_plain(path, &data, false)
156    }
157
158    /// Ensure the fold state is up-to-date with all on-disk entries.
159    ///
160    /// Read and write to on-disk caches transparently.
161    pub(crate) fn catch_up_with_log_on_disk_entries(&mut self, log: &Log) -> crate::Result<()> {
162        // Already up-to-date?
163        if self.offset == log.disk_buf.len() as u64 && self.epoch == log.meta.epoch {
164            return Ok(());
165        }
166
167        // Load from disk.
168        let opt_path = log
169            .dir
170            .as_opt_path()
171            .map(|p| p.join(format!("fold-{}", self.def.name)));
172        if let Some(path) = &opt_path {
173            if let Err(e) = self.load_from_file(path) {
174                tracing::warn!("cannot load FoldState: {}", e);
175            }
176        }
177
178        // Invalidate if mismatch.
179        if self.offset > log.disk_buf.len() as u64 || self.epoch != log.meta.epoch {
180            self.reset();
181        }
182        self.epoch = log.meta.epoch;
183
184        // Already up-to-date? (after loading from disk).
185        // If so, avoid complexities writing back to disk.
186        // Note mismatch epoch would reset offset to 0 above.
187        if self.offset == log.disk_buf.len() as u64 {
188            return Ok(());
189        }
190
191        // Catch up by processing remaining entries one by one.
192        let mut iter = log.iter();
193        if self.offset > 0 {
194            iter.next_offset = self.offset;
195        }
196        for entry in iter {
197            let entry = entry?;
198            self.fold.accumulate(entry)?;
199        }
200
201        // Set self state as up-to-date, and write to disk.
202        self.offset = log.disk_buf.len() as u64;
203        if let Some(path) = &opt_path {
204            if let Err(e) = self.save_to_file(path) {
205                tracing::warn!("cannot save FoldState: {}", e);
206            }
207        }
208
209        Ok(())
210    }
211
212    /// Process the next unprocessed entry.
213    ///
214    /// `offset` is the offset to the given entry.
215    /// `next_offset` is the offset to the next entry.
216    ///
217    /// The given entry must be the next one to be processed. All previous
218    /// entries are already processed and none of the entries after the given
219    /// entry are processed.
220    pub(crate) fn process_entry(
221        &mut self,
222        entry: &[u8],
223        offset: u64,
224        next_offset: u64,
225    ) -> crate::Result<()> {
226        if self.offset != offset {
227            return Err(Error::programming(format!(
228                "FoldState got mismatched offset: {:?} != {:?}",
229                self.offset, offset
230            )));
231        }
232        self.fold.accumulate(entry)?;
233        self.offset = next_offset;
234        Ok(())
235    }
236
237    fn reset(&mut self) {
238        self.offset = 0;
239        self.fold = (self.def.create_fold)();
240    }
241}
242
243#[cfg(test)]
244mod test {
245    use tempfile::tempdir;
246
247    use super::*;
248
249    #[derive(Debug, Default)]
250    struct ConcatFold(Vec<u8>);
251
252    impl Fold for ConcatFold {
253        fn load(&mut self, state_bytes: &[u8]) -> io::Result<()> {
254            self.0 = state_bytes.to_vec();
255            Ok(())
256        }
257
258        fn dump(&self) -> io::Result<Vec<u8>> {
259            Ok(self.0.clone())
260        }
261
262        fn accumulate(&mut self, entry: &[u8]) -> Result<()> {
263            self.0.extend_from_slice(entry);
264            Ok(())
265        }
266
267        fn as_any(&self) -> &dyn Any {
268            self
269        }
270
271        fn clone_boxed(&self) -> Box<dyn Fold> {
272            Box::new(Self(self.0.clone()))
273        }
274    }
275
276    #[derive(Debug, Default)]
277    struct CountFold(u64);
278
279    impl Fold for CountFold {
280        fn load(&mut self, state_bytes: &[u8]) -> io::Result<()> {
281            let bytes = <[u8; 8]>::try_from(state_bytes).unwrap();
282            let count = u64::from_be_bytes(bytes);
283            self.0 = count;
284            Ok(())
285        }
286
287        fn dump(&self) -> io::Result<Vec<u8>> {
288            Ok(self.0.to_be_bytes().to_vec())
289        }
290
291        fn accumulate(&mut self, _entry: &[u8]) -> Result<()> {
292            self.0 += 1;
293            Ok(())
294        }
295
296        fn as_any(&self) -> &dyn Any {
297            self
298        }
299
300        fn clone_boxed(&self) -> Box<dyn Fold> {
301            Box::new(Self(self.0))
302        }
303    }
304
305    #[test]
306    fn test_fold_state_load_save() {
307        let dir = tempdir().unwrap();
308        let path = dir.path().join("foo");
309        let def = FoldDef::new("foo", || Box::<ConcatFold>::default());
310        let d = |v: &FoldState| format!("{:?}", v);
311
312        let mut state1 = def.empty_state();
313        let mut state2 = def.empty_state();
314
315        // Check empty state round-trip.
316        state1.save_to_file(&path).unwrap();
317        state2.load_from_file(&path).unwrap();
318        assert_eq!(d(&state1), d(&state2));
319
320        // Check some state round-trip.
321        state1.epoch = 10;
322        state1.offset = 20;
323        state1.fold.accumulate(b"abc").unwrap();
324        state1.fold.accumulate(b"def").unwrap();
325        state2.fold.accumulate(b"ghi").unwrap();
326        state1.save_to_file(&path).unwrap();
327        state2.load_from_file(&path).unwrap();
328        assert_eq!(d(&state1), d(&state2));
329    }
330
331    #[test]
332    fn test_fold_on_log() {
333        let dir = tempdir().unwrap();
334        let path = dir.path();
335
336        // Prepare 2 logs.
337        let opts = crate::log::OpenOptions::new()
338            .fold_def("m", || Box::<ConcatFold>::default())
339            .fold_def("c", || Box::<CountFold>::default())
340            .create(true);
341        let mut log1 = opts.open(path).unwrap();
342        let mut log2 = log1.try_clone().unwrap();
343
344        // Helper to read fold results. f1: ConcatFold; f2: CountFold.
345        let f1 = |log: &Log| {
346            log.fold(0)
347                .unwrap()
348                .as_any()
349                .downcast_ref::<ConcatFold>()
350                .unwrap()
351                .0
352                .clone()
353        };
354        let f2 = |log: &Log| {
355            log.fold(1)
356                .unwrap()
357                .as_any()
358                .downcast_ref::<CountFold>()
359                .unwrap()
360                .0
361        };
362
363        // Empty logs.
364        assert_eq!(f1(&log1), b"");
365        assert_eq!(f2(&log2), 0);
366
367        // Different in-memory entries.
368        log1.append(b"ab").unwrap();
369        log1.append(b"cd").unwrap();
370        log2.append(b"e").unwrap();
371        log2.append(b"f").unwrap();
372        assert_eq!(f1(&log1), b"abcd");
373        assert_eq!(f2(&log1), 2);
374        assert_eq!(f1(&log2), b"ef");
375        assert_eq!(f2(&log2), 2);
376
377        // Write to disk. log2 will pick up log1 entries.
378        log1.sync().unwrap();
379        log2.sync().unwrap();
380        assert_eq!(f1(&log1), b"abcd");
381        assert_eq!(f2(&log1), 2);
382        assert_eq!(f1(&log2), b"abcdef");
383        assert_eq!(f2(&log2), 4);
384
385        // With new in-memory entries.
386        log1.append(b"x").unwrap();
387        log2.append(b"y").unwrap();
388        assert_eq!(f1(&log1), b"abcdx");
389        assert_eq!(f2(&log1), 3);
390        assert_eq!(f1(&log2), b"abcdefy");
391        assert_eq!(f2(&log2), 5);
392
393        // Clone with and without pending entries.
394        let log3 = log1.try_clone_without_dirty().unwrap();
395        assert_eq!(f1(&log3), b"abcd");
396        assert_eq!(f2(&log3), 2);
397        let log3 = log1.try_clone().unwrap();
398        assert_eq!(f1(&log3), b"abcdx");
399        assert_eq!(f2(&log3), 3);
400
401        // Write to disk again.
402        log2.sync().unwrap();
403        log1.sync().unwrap();
404        assert_eq!(f1(&log1), b"abcdefyx");
405        assert_eq!(f2(&log1), 6);
406        assert_eq!(f1(&log2), b"abcdefy");
407        assert_eq!(f2(&log2), 5);
408
409        // Sync with read fast path.
410        log2.sync().unwrap();
411        assert_eq!(f1(&log2), b"abcdefyx");
412        assert_eq!(f2(&log2), 6);
413
414        // Corrupted folds are simply ignored instead of causing errors.
415        fs::write(path.join("fold-m"), b"corruptedcontent").unwrap();
416        fs::write(path.join("fold-c"), b"\0\0\0\0\0\0\0\0\0").unwrap();
417        let mut log3 = opts.open(path).unwrap();
418        assert_eq!(f1(&log3), b"abcdefyx");
419        assert_eq!(f2(&log3), 6);
420        log3.sync().unwrap();
421        assert_eq!(f1(&log3), b"abcdefyx");
422        assert_eq!(f2(&log3), 6);
423    }
424}