Skip to main content

ygw/
record_file.rs

1//! File recording and replay
2//!
3//! The recording files are composed of a header and multiple segments. The segments are compressed with zstd.
4//! The format of the file is:
5//! part 1 - header
6//! - MAGIC - the string YGW_REC (7 bytes)
7//! - format version (1 byte) - current version is 1
8//! - first record number - first_rec_num  (8 bytes) - this has to match the filename
9//! - maximum number of segments (4 bytes)
10//!
11//! then for each segment 9 bytes segment information consisting of:
12//! - metadata (1 byte) - currently only if the segment contains parameter definitions
13//! - segment size (4 bytes) - used to be able to skip over entire segments when replaying from the middle of the file
14//! - the number of the last record in the segment, relative to first_rec_num (4 bytes)
15//!
16//! part 2 - data - formed by segments one after each-other
17//! the segments are zstd compressed data composed of records.
18//! Each record consists of:
19//! - record length  (without the length itself) (4 bytes)
20//! - record number relative to first_rec_num (4 bytes)
21//! - record data (length - 4)
22//!
23//! The trick with the zstd decoder is that it can decode segments concatenated one after eachother;
24//! zstd has itself the concept of a frame so our segments are basically zstd frames.
25use std::{
26    fs::{File, OpenOptions},
27    io::{BufReader, Read, Seek, SeekFrom, Write},
28    path::Path,
29};
30
31use crate::{Result, YgwError};
32use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
33use zstd;
34
35const MAX_REC_SIZE: u32 = 8 * 1024 * 1024;
36
37/// the size of each segment info
38const SEG_INFO_SIZE: usize = 9;
39
40const MAGIC: &[u8] = "YGW_REC".as_bytes();
41
42const FORMAT_VERSION: u8 = 1;
43
44const MAX_SEGMENTS_PER_FILE: u32 = 1000;
45
46/// where in the file starts the segment info
47/// after MAGIC, format version, first record number and maximum number of segments
48const SEG_INFO_START: u64 = 20;
49
50#[derive(Debug)]
51struct SegmentInfo {
52    /// true if the segment contains parameter definitions
53    has_pdef: bool,
54    /// the size in bytes of the segment (only known when the segment is closed)
55    size: u32,
56    /// the number of the last record in the segment, relative to the first record number of the file
57    last_rn: u32,
58}
59impl SegmentInfo {
60    fn to_bytes(&self) -> [u8; SEG_INFO_SIZE] {
61        let mut bytes = [0u8; SEG_INFO_SIZE];
62
63        bytes[0] = if self.has_pdef { 1 } else { 0 };
64
65        let offset_bytes = self.size.to_be_bytes();
66        bytes[1..5].copy_from_slice(&offset_bytes);
67
68        let num_rec_bytes = self.last_rn.to_be_bytes();
69        bytes[5..9].copy_from_slice(&num_rec_bytes);
70
71        bytes
72    }
73
74    fn from_bytes(bytes: &[u8; SEG_INFO_SIZE]) -> Self {
75        let has_pdef = bytes[0] != 0;
76
77        let size = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
78
79        let last_rn = u32::from_be_bytes([bytes[5], bytes[6], bytes[7], bytes[8]]);
80
81        Self {
82            has_pdef,
83            size,
84            last_rn,
85        }
86    }
87}
88
89/// recorder for one file
90pub struct FileRecorder<'a> {
91    /// maximum number of segments in this file.
92    /// each segment will have a preallocated segment info at the beginning of the file,
93    /// so this number cannot change once the file has been created
94    max_num_segments: u32,
95    /// first record number
96    first_rec_num: u64,
97    /// encoder for the current segment.
98    /// None it means the file is full, no more data can be added
99    encoder: Option<zstd::Encoder<'a, File>>,
100    /// number of the segment that is being recorded
101    seg_num: u32,
102    /// true if the current segment has parameter definitions
103    has_pdef: bool,
104    /// number of records in the current segment
105    num_records: u32,
106    /// offset where the current segment starts
107    offset: u64,
108    file: File,
109    /// maximum number of records in each segment
110    max_nrps: u32,
111    /// number of the last record written (relative to the first_rec_num)
112    pub last_rn: u32,
113}
114
115impl<'a> FileRecorder<'_> {
116    ///Create a new file recorder
117    /// # Arguments
118    ///
119    /// * `path` - The path of the file to be recorded.
120    /// * `first_rec_num` - The first record number. Al the record numbers in the file are relative to this one
121    /// * `max_num_seg` - The maximum number of segments that can be stored in the file. This determines the size of the header
122    /// * `max_nrps` - The maximum number of records per segment
123    pub fn new(
124        path: &Path,
125        first_rec_num: u64,
126        max_num_segments: u32,
127        max_nrps: u32,
128    ) -> Result<Self> {
129        assert!(max_num_segments < MAX_SEGMENTS_PER_FILE);
130
131        let mut file = OpenOptions::new()
132            .create(true)
133            .write(true)
134            .open(path)
135            .map_err(|e| YgwError::IOError(format!("Error opening {}", path.display()), e))?;
136
137        file.write_all(&MAGIC)?;
138        file.write_u8(FORMAT_VERSION)?;
139        file.write_u64::<BigEndian>(first_rec_num)?;
140        file.write_u32::<BigEndian>(max_num_segments)?;
141        let zeros = vec![0u8; (max_num_segments as usize) * SEG_INFO_SIZE];
142        file.write_all(&zeros)?;
143        file.sync_data()?;
144
145        let f = OpenOptions::new()
146            .append(true)
147            .open(path)
148            .map_err(|e| YgwError::IOError(format!("Error opening {}", path.display()), e))?;
149
150        let encoder = Some(zstd::Encoder::new(f, 0)?);
151
152        Ok(FileRecorder {
153            max_num_segments,
154            first_rec_num,
155            encoder,
156            seg_num: 0,
157            has_pdef: false,
158            num_records: 0,
159            offset: get_data_start_offset(max_num_segments),
160            file,
161            max_nrps,
162            last_rn: 0,
163        })
164    }
165
166    /// append the record to the end of the file
167    pub fn append(&mut self, rn: u64, msg: &[u8], is_pdef: bool) -> Result<()> {
168        let Some(ref mut encoder) = self.encoder else {
169            return Err(YgwError::RecordingFileFull(self.max_num_segments));
170        };
171
172        let local_rn =  (rn-self.first_rec_num).try_into().expect(&format!("the record number {rn} is too high compared with the first record number of the file {}", self.first_rec_num));
173
174        log::trace!("Appending rn={}, local_rn={}", rn, local_rn);
175
176        encoder.write_u32::<BigEndian>(4 + msg.len() as u32)?;
177        encoder.write_u32::<BigEndian>(local_rn)?;
178        encoder.write_all(msg)?;
179        self.has_pdef |= is_pdef;
180        self.num_records += 1;
181        self.last_rn = local_rn;
182
183        if self.num_records >= self.max_nrps {
184            self.end_segment(true)?;
185        }
186
187        Ok(())
188    }
189
190    pub fn is_full(&self) -> bool {
191        return self.encoder.is_none();
192    }
193
194    pub fn flush(&mut self) -> Result<()> {
195        if let Some(ref mut encoder) = self.encoder {
196            encoder.flush()?
197        }
198        Ok(())
199    }
200
201    fn end_segment(&mut self, start_next: bool) -> Result<()> {
202        let mut f = if let Some(encoder) = self.encoder.take() {
203            encoder.finish()?
204        } else {
205            panic!("unexpected state");
206        };
207        let offset = f.seek(SeekFrom::Current(0))?;
208        log::debug!(
209            "Ending segment {} at offset {}; last_rn: {}",
210            self.seg_num,
211            offset,
212            self.last_rn
213        );
214
215        let size = seg_size(offset, self.offset);
216
217        let seg_info = SegmentInfo {
218            has_pdef: self.has_pdef,
219            size,
220            last_rn: self.last_rn,
221        };
222        let seg_info_offset = SEG_INFO_START + (self.seg_num as u64) * (SEG_INFO_SIZE as u64);
223        self.file.seek(SeekFrom::Start(seg_info_offset))?;
224        self.file.write_all(&seg_info.to_bytes())?;
225        self.file.sync_data()?;
226
227        if start_next {
228            //next segment
229            self.seg_num += 1;
230            if self.seg_num >= self.max_num_segments {
231                //file full
232                return Ok(());
233            }
234            self.offset = offset;
235            self.has_pdef = false;
236            self.num_records = 0;
237
238            self.encoder = Some(zstd::Encoder::new(f, 0)?);
239        }
240        Ok(())
241    }
242}
243
244fn get_data_start_offset(num_segments: u32) -> u64 {
245    return SEG_INFO_START + (num_segments as u64) * (SEG_INFO_SIZE as u64);
246}
247fn seg_size(offset1: u64, offset2: u64) -> u32 {
248    (offset1 - offset2).try_into().expect("segment too large")
249}
250
251impl<'a> Drop for FileRecorder<'_> {
252    fn drop(&mut self) {
253        if !self.is_full() && self.num_records > 0 {
254            if let Err(e) = self.end_segment(false) {
255                log::error!("Error in closing the file {:?}", e);
256            }
257        }
258    }
259}
260
261pub struct FilePlayer {
262    first_rec_num: u64,
263    segments: Vec<SegmentInfo>,
264    file: File,
265}
266
267impl<'a> FilePlayer {
268    /// open a file for replay
269    /// An CorruptedRecordingFile may be returned if the header of the file is not correct:
270    /// - magic string not present
271    /// - format version not matching the expected version (1)
272    /// - first record number not matching  the argument of the function
273    /// - maximum number of segments is too large
274    /// - the size of the segment data according to the header is smaller than the size of the file
275    /// - the last record numbers of the segments are not
276    ///
277    /// If the size of the file is larger than the sum of the segments from the header, then an attempt is to parse the
278    /// last segment to find its size. This is because if the recording is not closed properly,
279    ///  the last segment information will not be present in the header. This is not considered a data corruption.
280    pub fn open(path: &Path, first_rec_num: u64) -> Result<Self> {
281        let mut file = OpenOptions::new().read(true).write(false).open(path)?;
282        let mut magic = vec![0u8; MAGIC.len()];
283        file.read_exact(&mut magic)?;
284        if magic != MAGIC {
285            return Err(crate::YgwError::CorruptedRecordingFile(format!(
286                "Incorrect magic read from {}: {:?}",
287                path.display(),
288                magic
289            )));
290        }
291        let format = file.read_u8()?;
292        if format != FORMAT_VERSION {
293            return Err(crate::YgwError::CorruptedRecordingFile(format!(
294                "Incorrect format version read from {}: {} (expected {})",
295                path.display(),
296                format,
297                FORMAT_VERSION
298            )));
299        }
300
301        let frn = file.read_u64::<BigEndian>()?;
302        if frn != first_rec_num {
303            return Err(crate::YgwError::CorruptedRecordingFile(format!(
304                "Incorrect first record number read from from {}: {} (expected {})",
305                path.display(),
306                frn,
307                first_rec_num
308            )));
309        }
310
311        let max_num_segments = file.read_u32::<BigEndian>()?;
312
313        if max_num_segments > MAX_SEGMENTS_PER_FILE {
314            return Err(crate::YgwError::CorruptedRecordingFile(format!(
315                "The file {} contains {} segments. Maximum expected is {}",
316                path.display(),
317                max_num_segments,
318                MAX_SEGMENTS_PER_FILE
319            )));
320        }
321        let mut segments = Vec::with_capacity(max_num_segments as usize);
322
323        // read the segment information
324        for _ in 0..max_num_segments {
325            let mut buf = [0u8; SEG_INFO_SIZE];
326            file.read_exact(&mut buf)?;
327            segments.push(SegmentInfo::from_bytes(&buf));
328        }
329        verify_consistency(&segments)?;
330
331        let hdr_size = get_data_start_offset(max_num_segments);
332        let file_size = file.metadata()?.len();
333        let seg_size: u64 = segments.iter().map(|s| s.size as u64).sum();
334        log::debug!(
335            "{}: file_size: {file_size} hdr_size: {hdr_size} segment_data_size: {seg_size} ",
336            path.display(),
337        );
338
339        let mut player = Self {
340            first_rec_num,
341            segments,
342            file,
343        };
344
345        if hdr_size + seg_size < file_size {
346            if let Some(idx) = player.segments.iter().position(|s| s.size == 0) {
347                player.recover_last_segment(idx)?;
348            } else {
349                return Err(YgwError::CorruptedRecordingFile(format!("{}: header + size of the segment data is smaller than the size of the file: {hdr_size} + {seg_size} < {file_size}) yet no segment is incomplete according to the header", path.display())));
350            }
351        } else if hdr_size + seg_size > file_size {
352            return Err(YgwError::CorruptedRecordingFile(format!("{}: header + size of the segment data is greater than the size of the file: {hdr_size} + {seg_size} > {file_size})", path.display())));
353        }
354
355        Ok(player)
356    }
357
358    /// parse the last segment to find its size and last record number
359    fn recover_last_segment(&mut self, idx: usize) -> Result<()> {
360        let offset = self
361            .segments
362            .iter()
363            .take(idx)
364            .map(|s| s.size as u64)
365            .sum::<u64>()
366            + self.get_data_start_offset();
367        let mut file = self.file.try_clone()?;
368
369        file.seek(SeekFrom::Start(offset))?;
370        let mut decoder = zstd::Decoder::new(file)?;
371        let mut last_rn = 0;
372        while let Ok((rn, _)) = read_record(&mut decoder) {
373            last_rn = rn;
374        }
375        let mut file = decoder.finish();
376        let offset1 = file.stream_position()?;
377
378        self.segments[idx].last_rn = last_rn;
379        self.segments[idx].size = seg_size(offset1, offset);
380        //TODO
381        //self.segments[idx].has_pdef =
382
383        Ok(())
384    }
385
386    /// returns an iterator that iterates over all records
387    /// the iterator does not take into account the segment information
388    /// it just uses the zstd decoder to decode all segments
389    pub fn iter(&mut self) -> Result<FilePlayerIterator> {
390        let mut file = self.file.try_clone()?;
391        file.seek(SeekFrom::Start(get_data_start_offset(
392            self.segments.len() as u32
393        )))?;
394        let decoder = Some(zstd::Decoder::new(file)?);
395
396        Ok(FilePlayerIterator { fr: self, decoder })
397    }
398
399    /// returns an iterator that iterates over all records starting with seg_num segment
400    /// the position of the segment is based on the correct metadata information for the previous segments
401    pub fn iter_from_segment(&mut self, seg_num: u32) -> Result<FilePlayerIterator> {
402        let mut file = self.file.try_clone()?;
403
404        let offset = self
405            .segments
406            .iter()
407            .take(seg_num as usize)
408            .map(|s| s.size as u64)
409            .sum::<u64>()
410            + self.get_data_start_offset();
411        file.seek(SeekFrom::Start(offset))?;
412
413        let decoder = Some(zstd::Decoder::new(file)?);
414
415        Ok(FilePlayerIterator { fr: self, decoder })
416    }
417
418    /// Returns an iterator that starts from the segment that contains or comes after the given record number (absolute `rn`).
419    /// If `rn` is outside the file range, returns an empty iterator.
420    pub fn iter_from_segment_with_rn(&mut self, rn: u64) -> Result<FilePlayerIterator> {
421        if rn < self.first_rec_num {
422            // Start from the beginning
423            return self.iter_from_segment(0);
424        }
425        let rel_rn = (rn - self.first_rec_num) as u32;
426
427        // Find the first segment whose last_rn >= rel_rn
428        let mut seg_num = None;
429        for (i, seg) in self.segments.iter().enumerate() {
430            if seg.size == 0 {
431                break; // no valid segments after this point
432            }
433            if seg.last_rn >= rel_rn {
434                seg_num = Some(i as u32);
435                break;
436            }
437        }
438
439        if let Some(seg_num) = seg_num {
440            self.iter_from_segment(seg_num)
441        } else {
442            // rn is after the last record in the file
443            Ok(FilePlayerIterator {
444                fr: self,
445                decoder: None,
446            })
447        }
448    }
449
450    pub fn get_data_start_offset(&self) -> u64 {
451        get_data_start_offset(self.segments.len() as u32)
452    }
453
454    pub fn get_last_record_number(&self) -> Option<u64> {
455        self.segments
456            .iter()
457            .rev()
458            .find(|s| s.size > 0)
459            .map(|s| s.last_rn)
460            .map(|rn| rn as u64 + self.first_rec_num)
461    }
462}
463
464/// return an error:
465///  - if there is a segment of size>0 following a segment of size 0
466///  - if the last record numbers are not increasing
467fn verify_consistency(segments: &[SegmentInfo]) -> Result<()> {
468    for i in 1..segments.len() {
469        if segments[i - 1].size == 0 && segments[i].size > 0 {
470            return Err(YgwError::CorruptedRecordingFile(format!(
471                "The segment {i} of size>0 is following a segment of size 0"
472            )));
473        }
474        if segments[i].size > 0 && segments[i].last_rn <= segments[i - 1].last_rn {
475            return Err(YgwError::CorruptedRecordingFile(format!(
476                "The last record numbers are not increasing"
477            )));
478        }
479    }
480    Ok(())
481}
482
483pub struct FilePlayerIterator<'b> {
484    fr: &'b FilePlayer,
485    decoder: Option<zstd::Decoder<'b, BufReader<File>>>,
486}
487
488impl<'b> Iterator for FilePlayerIterator<'b> {
489    type Item = Result<(u64, Vec<u8>)>;
490
491    fn next(&mut self) -> Option<Self::Item> {
492        match self.decoder {
493            None => None,
494            Some(ref mut decoder) => match read_record(decoder) {
495                Ok((rec_num, data)) => Some(Ok((self.fr.first_rec_num + rec_num as u64, data))),
496                Err(YgwError::IOError(_, e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
497                    self.decoder.take();
498                    None
499                }
500                Err(e) => Some(Err(e)),
501            },
502        }
503    }
504}
505
506fn read_record(decoder: &mut zstd::Decoder<BufReader<File>>) -> Result<(u32, Vec<u8>)> {
507    let size = decoder.read_u32::<BigEndian>()?;
508    if size > MAX_REC_SIZE {
509        return Err(YgwError::DecodeError(format!(
510            "Found record larger than the maximum allowed ({} > {})",
511            size, MAX_REC_SIZE
512        )));
513    }
514    if size < 4 {
515        return Err(YgwError::DecodeError(format!(
516            "Invalid record size {} (expected at least 4)",
517            size
518        )));
519    }
520    let rec_num = decoder.read_u32::<BigEndian>()?;
521    let mut buf = vec![0; size as usize - 4];
522    decoder.read_exact(&mut buf)?;
523
524    Ok((rec_num, buf))
525}
526
527#[cfg(test)]
528mod tests {
529    use super::*;
530    use crate::Result;
531    use tempfile::tempdir;
532
533    #[test]
534    fn test1() {
535        let tmp_dir = tempdir().unwrap();
536        let path = tmp_dir.path().join("test1");
537
538        let mut recorder = FileRecorder::new(&path, 1000, 2, 2).unwrap();
539
540        //first_rec_num does not correspond to the recording
541        let r = FilePlayer::open(&path, 300);
542        assert!(matches!(r, Err(_)));
543
544        let mut player = FilePlayer::open(&path, 1000).unwrap();
545        let mut it = player.iter().unwrap();
546        assert!(it.next().is_none());
547
548        let data = vec![1u8; 10];
549
550        recorder.append(1000, &data, false).unwrap();
551        recorder.flush().unwrap();
552
553        let mut player = FilePlayer::open(&path, 1000).unwrap();
554        assert_eq!(player.first_rec_num, 1000);
555        assert_eq!(player.segments.len(), 2);
556
557        let mut it = player.iter().unwrap();
558        check_equals(it.next(), 1000, &data);
559        assert!(it.next().is_none());
560
561        recorder.append(2000, &data, false).unwrap();
562        recorder.flush().unwrap();
563
564        let mut player = FilePlayer::open(&path, 1000).unwrap();
565        let mut it = player.iter().unwrap();
566        check_equals(it.next(), 1000, &data);
567        check_equals(it.next(), 2000, &data);
568        assert!(it.next().is_none());
569
570        recorder.append(3000, &data, false).unwrap();
571        assert_eq!(false, recorder.is_full());
572        recorder.append(3001, &data, false).unwrap();
573        assert_eq!(true, recorder.is_full());
574
575        let mut player = FilePlayer::open(&path, 1000).unwrap();
576        let mut it = player.iter().unwrap();
577        check_equals(it.next(), 1000, &data);
578        check_equals(it.next(), 2000, &data);
579        check_equals(it.next(), 3000, &data);
580        check_equals(it.next(), 3001, &data);
581        assert!(it.next().is_none());
582    }
583
584    fn check_equals(
585        actual: Option<Result<(u64, Vec<u8>)>>,
586        expected_rec_num: u64,
587        expected_data: &[u8],
588    ) {
589        if let Some(Ok((rec_num, ref data))) = actual {
590            assert_eq!(rec_num, expected_rec_num);
591            assert_eq!(data, expected_data);
592        } else {
593            panic!(
594                "did not match, expected ({expected_rec_num}, {:?})  but got {:?}",
595                expected_data, actual
596            );
597        }
598    }
599}