1use std::any::Any;
9use std::fmt::Debug;
10use std::fs;
11use std::io;
12use std::path::Path;
13
14use vlqencoding::VLQDecode;
15use vlqencoding::VLQEncode;
16
17use crate::errors::IoResultExt;
18use crate::log::Log;
19use crate::utils::atomic_write_plain;
20use crate::utils::xxhash;
21use crate::Error;
22use crate::Result;
23
24#[derive(Clone, Debug)]
34pub struct FoldDef {
35 pub(crate) create_fold: fn() -> Box<dyn Fold>,
37
38 pub(crate) name: &'static str,
46}
47
48pub trait Fold: Debug + 'static + Send + Sync {
50 fn load(&mut self, state_bytes: &[u8]) -> io::Result<()>;
53
54 fn dump(&self) -> io::Result<Vec<u8>>;
56
57 fn accumulate(&mut self, entry: &[u8]) -> Result<()>;
59
60 fn as_any(&self) -> &dyn Any;
62
63 fn clone_boxed(&self) -> Box<dyn Fold>;
65}
66
67#[derive(Debug)]
69pub(crate) struct FoldState {
70 pub(crate) epoch: u64,
73
74 pub(crate) offset: u64,
76
77 pub(crate) fold: Box<dyn Fold>,
79
80 def: FoldDef,
82}
83
84impl FoldDef {
85 pub fn new(name: &'static str, create_fold: fn() -> Box<dyn Fold>) -> Self {
89 Self { create_fold, name }
90 }
91
92 pub(crate) fn empty_state(&self) -> FoldState {
93 FoldState {
94 epoch: 0,
95 offset: 0,
96 fold: (self.create_fold)(),
97 def: self.clone(),
98 }
99 }
100}
101
102impl Clone for FoldState {
103 fn clone(&self) -> Self {
104 Self {
105 epoch: self.epoch,
106 offset: self.offset,
107 fold: self.fold.clone_boxed(),
108 def: self.def.clone(),
109 }
110 }
111}
112
113impl FoldState {
114 pub(crate) fn load_from_file(&mut self, path: &Path) -> crate::Result<()> {
115 (|| -> io::Result<()> {
116 let data = fs::read(path)?;
117 let checksum = match data.get(0..8) {
118 Some(h) => u64::from_be_bytes(<[u8; 8]>::try_from(h).unwrap()),
119 None => {
120 return Err(io::Error::new(
121 io::ErrorKind::InvalidData,
122 format!("corrupted FoldState (no checksum): {:?}", data),
123 ));
124 }
125 };
126 if xxhash(&data[8..]) != checksum {
127 return Err(io::Error::new(
128 io::ErrorKind::InvalidData,
129 format!("corrupted FoldState (wrong checksum): {:?}", data),
130 ));
131 }
132 let mut reader = &data[8..];
133 let epoch = reader.read_vlq()?;
134 let offset = reader.read_vlq()?;
135 self.fold.load(reader)?;
136 self.epoch = epoch;
137 self.offset = offset;
138 Ok(())
139 })()
140 .context(path, "cannot read FoldState")
141 }
142
143 pub(crate) fn save_to_file(&self, path: &Path) -> crate::Result<()> {
144 let data = (|| -> io::Result<Vec<u8>> {
145 let mut body = Vec::new();
146 body.write_vlq(self.epoch)?;
147 body.write_vlq(self.offset)?;
148 body.extend_from_slice(&self.fold.dump()?);
149 let checksum = xxhash(&body);
150 let mut data: Vec<u8> = checksum.to_be_bytes().to_vec();
151 data.extend_from_slice(&body);
152 Ok(data)
153 })()
154 .context(path, "cannot prepare FoldState")?;
155 atomic_write_plain(path, &data, false)
156 }
157
158 pub(crate) fn catch_up_with_log_on_disk_entries(&mut self, log: &Log) -> crate::Result<()> {
162 if self.offset == log.disk_buf.len() as u64 && self.epoch == log.meta.epoch {
164 return Ok(());
165 }
166
167 let opt_path = log
169 .dir
170 .as_opt_path()
171 .map(|p| p.join(format!("fold-{}", self.def.name)));
172 if let Some(path) = &opt_path {
173 if let Err(e) = self.load_from_file(path) {
174 tracing::warn!("cannot load FoldState: {}", e);
175 }
176 }
177
178 if self.offset > log.disk_buf.len() as u64 || self.epoch != log.meta.epoch {
180 self.reset();
181 }
182 self.epoch = log.meta.epoch;
183
184 if self.offset == log.disk_buf.len() as u64 {
188 return Ok(());
189 }
190
191 let mut iter = log.iter();
193 if self.offset > 0 {
194 iter.next_offset = self.offset;
195 }
196 for entry in iter {
197 let entry = entry?;
198 self.fold.accumulate(entry)?;
199 }
200
201 self.offset = log.disk_buf.len() as u64;
203 if let Some(path) = &opt_path {
204 if let Err(e) = self.save_to_file(path) {
205 tracing::warn!("cannot save FoldState: {}", e);
206 }
207 }
208
209 Ok(())
210 }
211
212 pub(crate) fn process_entry(
221 &mut self,
222 entry: &[u8],
223 offset: u64,
224 next_offset: u64,
225 ) -> crate::Result<()> {
226 if self.offset != offset {
227 return Err(Error::programming(format!(
228 "FoldState got mismatched offset: {:?} != {:?}",
229 self.offset, offset
230 )));
231 }
232 self.fold.accumulate(entry)?;
233 self.offset = next_offset;
234 Ok(())
235 }
236
237 fn reset(&mut self) {
238 self.offset = 0;
239 self.fold = (self.def.create_fold)();
240 }
241}
242
243#[cfg(test)]
244mod test {
245 use tempfile::tempdir;
246
247 use super::*;
248
249 #[derive(Debug, Default)]
250 struct ConcatFold(Vec<u8>);
251
252 impl Fold for ConcatFold {
253 fn load(&mut self, state_bytes: &[u8]) -> io::Result<()> {
254 self.0 = state_bytes.to_vec();
255 Ok(())
256 }
257
258 fn dump(&self) -> io::Result<Vec<u8>> {
259 Ok(self.0.clone())
260 }
261
262 fn accumulate(&mut self, entry: &[u8]) -> Result<()> {
263 self.0.extend_from_slice(entry);
264 Ok(())
265 }
266
267 fn as_any(&self) -> &dyn Any {
268 self
269 }
270
271 fn clone_boxed(&self) -> Box<dyn Fold> {
272 Box::new(Self(self.0.clone()))
273 }
274 }
275
276 #[derive(Debug, Default)]
277 struct CountFold(u64);
278
279 impl Fold for CountFold {
280 fn load(&mut self, state_bytes: &[u8]) -> io::Result<()> {
281 let bytes = <[u8; 8]>::try_from(state_bytes).unwrap();
282 let count = u64::from_be_bytes(bytes);
283 self.0 = count;
284 Ok(())
285 }
286
287 fn dump(&self) -> io::Result<Vec<u8>> {
288 Ok(self.0.to_be_bytes().to_vec())
289 }
290
291 fn accumulate(&mut self, _entry: &[u8]) -> Result<()> {
292 self.0 += 1;
293 Ok(())
294 }
295
296 fn as_any(&self) -> &dyn Any {
297 self
298 }
299
300 fn clone_boxed(&self) -> Box<dyn Fold> {
301 Box::new(Self(self.0))
302 }
303 }
304
305 #[test]
306 fn test_fold_state_load_save() {
307 let dir = tempdir().unwrap();
308 let path = dir.path().join("foo");
309 let def = FoldDef::new("foo", || Box::<ConcatFold>::default());
310 let d = |v: &FoldState| format!("{:?}", v);
311
312 let mut state1 = def.empty_state();
313 let mut state2 = def.empty_state();
314
315 state1.save_to_file(&path).unwrap();
317 state2.load_from_file(&path).unwrap();
318 assert_eq!(d(&state1), d(&state2));
319
320 state1.epoch = 10;
322 state1.offset = 20;
323 state1.fold.accumulate(b"abc").unwrap();
324 state1.fold.accumulate(b"def").unwrap();
325 state2.fold.accumulate(b"ghi").unwrap();
326 state1.save_to_file(&path).unwrap();
327 state2.load_from_file(&path).unwrap();
328 assert_eq!(d(&state1), d(&state2));
329 }
330
331 #[test]
332 fn test_fold_on_log() {
333 let dir = tempdir().unwrap();
334 let path = dir.path();
335
336 let opts = crate::log::OpenOptions::new()
338 .fold_def("m", || Box::<ConcatFold>::default())
339 .fold_def("c", || Box::<CountFold>::default())
340 .create(true);
341 let mut log1 = opts.open(path).unwrap();
342 let mut log2 = log1.try_clone().unwrap();
343
344 let f1 = |log: &Log| {
346 log.fold(0)
347 .unwrap()
348 .as_any()
349 .downcast_ref::<ConcatFold>()
350 .unwrap()
351 .0
352 .clone()
353 };
354 let f2 = |log: &Log| {
355 log.fold(1)
356 .unwrap()
357 .as_any()
358 .downcast_ref::<CountFold>()
359 .unwrap()
360 .0
361 };
362
363 assert_eq!(f1(&log1), b"");
365 assert_eq!(f2(&log2), 0);
366
367 log1.append(b"ab").unwrap();
369 log1.append(b"cd").unwrap();
370 log2.append(b"e").unwrap();
371 log2.append(b"f").unwrap();
372 assert_eq!(f1(&log1), b"abcd");
373 assert_eq!(f2(&log1), 2);
374 assert_eq!(f1(&log2), b"ef");
375 assert_eq!(f2(&log2), 2);
376
377 log1.sync().unwrap();
379 log2.sync().unwrap();
380 assert_eq!(f1(&log1), b"abcd");
381 assert_eq!(f2(&log1), 2);
382 assert_eq!(f1(&log2), b"abcdef");
383 assert_eq!(f2(&log2), 4);
384
385 log1.append(b"x").unwrap();
387 log2.append(b"y").unwrap();
388 assert_eq!(f1(&log1), b"abcdx");
389 assert_eq!(f2(&log1), 3);
390 assert_eq!(f1(&log2), b"abcdefy");
391 assert_eq!(f2(&log2), 5);
392
393 let log3 = log1.try_clone_without_dirty().unwrap();
395 assert_eq!(f1(&log3), b"abcd");
396 assert_eq!(f2(&log3), 2);
397 let log3 = log1.try_clone().unwrap();
398 assert_eq!(f1(&log3), b"abcdx");
399 assert_eq!(f2(&log3), 3);
400
401 log2.sync().unwrap();
403 log1.sync().unwrap();
404 assert_eq!(f1(&log1), b"abcdefyx");
405 assert_eq!(f2(&log1), 6);
406 assert_eq!(f1(&log2), b"abcdefy");
407 assert_eq!(f2(&log2), 5);
408
409 log2.sync().unwrap();
411 assert_eq!(f1(&log2), b"abcdefyx");
412 assert_eq!(f2(&log2), 6);
413
414 fs::write(path.join("fold-m"), b"corruptedcontent").unwrap();
416 fs::write(path.join("fold-c"), b"\0\0\0\0\0\0\0\0\0").unwrap();
417 let mut log3 = opts.open(path).unwrap();
418 assert_eq!(f1(&log3), b"abcdefyx");
419 assert_eq!(f2(&log3), 6);
420 log3.sync().unwrap();
421 assert_eq!(f1(&log3), b"abcdefyx");
422 assert_eq!(f2(&log3), 6);
423 }
424}