1use std::{
26 fs::{File, OpenOptions},
27 io::{BufReader, Read, Seek, SeekFrom, Write},
28 path::Path,
29};
30
31use crate::{Result, YgwError};
32use byteorder::{BigEndian, ReadBytesExt, WriteBytesExt};
33use zstd;
34
35const MAX_REC_SIZE: u32 = 8 * 1024 * 1024;
36
37const SEG_INFO_SIZE: usize = 9;
39
40const MAGIC: &[u8] = "YGW_REC".as_bytes();
41
42const FORMAT_VERSION: u8 = 1;
43
44const MAX_SEGMENTS_PER_FILE: u32 = 1000;
45
46const SEG_INFO_START: u64 = 20;
49
50#[derive(Debug)]
51struct SegmentInfo {
52 has_pdef: bool,
54 size: u32,
56 last_rn: u32,
58}
59impl SegmentInfo {
60 fn to_bytes(&self) -> [u8; SEG_INFO_SIZE] {
61 let mut bytes = [0u8; SEG_INFO_SIZE];
62
63 bytes[0] = if self.has_pdef { 1 } else { 0 };
64
65 let offset_bytes = self.size.to_be_bytes();
66 bytes[1..5].copy_from_slice(&offset_bytes);
67
68 let num_rec_bytes = self.last_rn.to_be_bytes();
69 bytes[5..9].copy_from_slice(&num_rec_bytes);
70
71 bytes
72 }
73
74 fn from_bytes(bytes: &[u8; SEG_INFO_SIZE]) -> Self {
75 let has_pdef = bytes[0] != 0;
76
77 let size = u32::from_be_bytes([bytes[1], bytes[2], bytes[3], bytes[4]]);
78
79 let last_rn = u32::from_be_bytes([bytes[5], bytes[6], bytes[7], bytes[8]]);
80
81 Self {
82 has_pdef,
83 size,
84 last_rn,
85 }
86 }
87}
88
89pub struct FileRecorder<'a> {
91 max_num_segments: u32,
95 first_rec_num: u64,
97 encoder: Option<zstd::Encoder<'a, File>>,
100 seg_num: u32,
102 has_pdef: bool,
104 num_records: u32,
106 offset: u64,
108 file: File,
109 max_nrps: u32,
111 pub last_rn: u32,
113}
114
115impl<'a> FileRecorder<'_> {
116 pub fn new(
124 path: &Path,
125 first_rec_num: u64,
126 max_num_segments: u32,
127 max_nrps: u32,
128 ) -> Result<Self> {
129 assert!(max_num_segments < MAX_SEGMENTS_PER_FILE);
130
131 let mut file = OpenOptions::new()
132 .create(true)
133 .write(true)
134 .open(path)
135 .map_err(|e| YgwError::IOError(format!("Error opening {}", path.display()), e))?;
136
137 file.write_all(&MAGIC)?;
138 file.write_u8(FORMAT_VERSION)?;
139 file.write_u64::<BigEndian>(first_rec_num)?;
140 file.write_u32::<BigEndian>(max_num_segments)?;
141 let zeros = vec![0u8; (max_num_segments as usize) * SEG_INFO_SIZE];
142 file.write_all(&zeros)?;
143 file.sync_data()?;
144
145 let f = OpenOptions::new()
146 .append(true)
147 .open(path)
148 .map_err(|e| YgwError::IOError(format!("Error opening {}", path.display()), e))?;
149
150 let encoder = Some(zstd::Encoder::new(f, 0)?);
151
152 Ok(FileRecorder {
153 max_num_segments,
154 first_rec_num,
155 encoder,
156 seg_num: 0,
157 has_pdef: false,
158 num_records: 0,
159 offset: get_data_start_offset(max_num_segments),
160 file,
161 max_nrps,
162 last_rn: 0,
163 })
164 }
165
166 pub fn append(&mut self, rn: u64, msg: &[u8], is_pdef: bool) -> Result<()> {
168 let Some(ref mut encoder) = self.encoder else {
169 return Err(YgwError::RecordingFileFull(self.max_num_segments));
170 };
171
172 let local_rn = (rn-self.first_rec_num).try_into().expect(&format!("the record number {rn} is too high compared with the first record number of the file {}", self.first_rec_num));
173
174 log::trace!("Appending rn={}, local_rn={}", rn, local_rn);
175
176 encoder.write_u32::<BigEndian>(4 + msg.len() as u32)?;
177 encoder.write_u32::<BigEndian>(local_rn)?;
178 encoder.write_all(msg)?;
179 self.has_pdef |= is_pdef;
180 self.num_records += 1;
181 self.last_rn = local_rn;
182
183 if self.num_records >= self.max_nrps {
184 self.end_segment(true)?;
185 }
186
187 Ok(())
188 }
189
190 pub fn is_full(&self) -> bool {
191 return self.encoder.is_none();
192 }
193
194 pub fn flush(&mut self) -> Result<()> {
195 if let Some(ref mut encoder) = self.encoder {
196 encoder.flush()?
197 }
198 Ok(())
199 }
200
201 fn end_segment(&mut self, start_next: bool) -> Result<()> {
202 let mut f = if let Some(encoder) = self.encoder.take() {
203 encoder.finish()?
204 } else {
205 panic!("unexpected state");
206 };
207 let offset = f.seek(SeekFrom::Current(0))?;
208 log::debug!(
209 "Ending segment {} at offset {}; last_rn: {}",
210 self.seg_num,
211 offset,
212 self.last_rn
213 );
214
215 let size = seg_size(offset, self.offset);
216
217 let seg_info = SegmentInfo {
218 has_pdef: self.has_pdef,
219 size,
220 last_rn: self.last_rn,
221 };
222 let seg_info_offset = SEG_INFO_START + (self.seg_num as u64) * (SEG_INFO_SIZE as u64);
223 self.file.seek(SeekFrom::Start(seg_info_offset))?;
224 self.file.write_all(&seg_info.to_bytes())?;
225 self.file.sync_data()?;
226
227 if start_next {
228 self.seg_num += 1;
230 if self.seg_num >= self.max_num_segments {
231 return Ok(());
233 }
234 self.offset = offset;
235 self.has_pdef = false;
236 self.num_records = 0;
237
238 self.encoder = Some(zstd::Encoder::new(f, 0)?);
239 }
240 Ok(())
241 }
242}
243
244fn get_data_start_offset(num_segments: u32) -> u64 {
245 return SEG_INFO_START + (num_segments as u64) * (SEG_INFO_SIZE as u64);
246}
247fn seg_size(offset1: u64, offset2: u64) -> u32 {
248 (offset1 - offset2).try_into().expect("segment too large")
249}
250
251impl<'a> Drop for FileRecorder<'_> {
252 fn drop(&mut self) {
253 if !self.is_full() && self.num_records > 0 {
254 if let Err(e) = self.end_segment(false) {
255 log::error!("Error in closing the file {:?}", e);
256 }
257 }
258 }
259}
260
261pub struct FilePlayer {
262 first_rec_num: u64,
263 segments: Vec<SegmentInfo>,
264 file: File,
265}
266
267impl<'a> FilePlayer {
268 pub fn open(path: &Path, first_rec_num: u64) -> Result<Self> {
281 let mut file = OpenOptions::new().read(true).write(false).open(path)?;
282 let mut magic = vec![0u8; MAGIC.len()];
283 file.read_exact(&mut magic)?;
284 if magic != MAGIC {
285 return Err(crate::YgwError::CorruptedRecordingFile(format!(
286 "Incorrect magic read from {}: {:?}",
287 path.display(),
288 magic
289 )));
290 }
291 let format = file.read_u8()?;
292 if format != FORMAT_VERSION {
293 return Err(crate::YgwError::CorruptedRecordingFile(format!(
294 "Incorrect format version read from {}: {} (expected {})",
295 path.display(),
296 format,
297 FORMAT_VERSION
298 )));
299 }
300
301 let frn = file.read_u64::<BigEndian>()?;
302 if frn != first_rec_num {
303 return Err(crate::YgwError::CorruptedRecordingFile(format!(
304 "Incorrect first record number read from from {}: {} (expected {})",
305 path.display(),
306 frn,
307 first_rec_num
308 )));
309 }
310
311 let max_num_segments = file.read_u32::<BigEndian>()?;
312
313 if max_num_segments > MAX_SEGMENTS_PER_FILE {
314 return Err(crate::YgwError::CorruptedRecordingFile(format!(
315 "The file {} contains {} segments. Maximum expected is {}",
316 path.display(),
317 max_num_segments,
318 MAX_SEGMENTS_PER_FILE
319 )));
320 }
321 let mut segments = Vec::with_capacity(max_num_segments as usize);
322
323 for _ in 0..max_num_segments {
325 let mut buf = [0u8; SEG_INFO_SIZE];
326 file.read_exact(&mut buf)?;
327 segments.push(SegmentInfo::from_bytes(&buf));
328 }
329 verify_consistency(&segments)?;
330
331 let hdr_size = get_data_start_offset(max_num_segments);
332 let file_size = file.metadata()?.len();
333 let seg_size: u64 = segments.iter().map(|s| s.size as u64).sum();
334 log::debug!(
335 "{}: file_size: {file_size} hdr_size: {hdr_size} segment_data_size: {seg_size} ",
336 path.display(),
337 );
338
339 let mut player = Self {
340 first_rec_num,
341 segments,
342 file,
343 };
344
345 if hdr_size + seg_size < file_size {
346 if let Some(idx) = player.segments.iter().position(|s| s.size == 0) {
347 player.recover_last_segment(idx)?;
348 } else {
349 return Err(YgwError::CorruptedRecordingFile(format!("{}: header + size of the segment data is smaller than the size of the file: {hdr_size} + {seg_size} < {file_size}) yet no segment is incomplete according to the header", path.display())));
350 }
351 } else if hdr_size + seg_size > file_size {
352 return Err(YgwError::CorruptedRecordingFile(format!("{}: header + size of the segment data is greater than the size of the file: {hdr_size} + {seg_size} > {file_size})", path.display())));
353 }
354
355 Ok(player)
356 }
357
358 fn recover_last_segment(&mut self, idx: usize) -> Result<()> {
360 let offset = self
361 .segments
362 .iter()
363 .take(idx)
364 .map(|s| s.size as u64)
365 .sum::<u64>()
366 + self.get_data_start_offset();
367 let mut file = self.file.try_clone()?;
368
369 file.seek(SeekFrom::Start(offset))?;
370 let mut decoder = zstd::Decoder::new(file)?;
371 let mut last_rn = 0;
372 while let Ok((rn, _)) = read_record(&mut decoder) {
373 last_rn = rn;
374 }
375 let mut file = decoder.finish();
376 let offset1 = file.stream_position()?;
377
378 self.segments[idx].last_rn = last_rn;
379 self.segments[idx].size = seg_size(offset1, offset);
380 Ok(())
384 }
385
386 pub fn iter(&mut self) -> Result<FilePlayerIterator> {
390 let mut file = self.file.try_clone()?;
391 file.seek(SeekFrom::Start(get_data_start_offset(
392 self.segments.len() as u32
393 )))?;
394 let decoder = Some(zstd::Decoder::new(file)?);
395
396 Ok(FilePlayerIterator { fr: self, decoder })
397 }
398
399 pub fn iter_from_segment(&mut self, seg_num: u32) -> Result<FilePlayerIterator> {
402 let mut file = self.file.try_clone()?;
403
404 let offset = self
405 .segments
406 .iter()
407 .take(seg_num as usize)
408 .map(|s| s.size as u64)
409 .sum::<u64>()
410 + self.get_data_start_offset();
411 file.seek(SeekFrom::Start(offset))?;
412
413 let decoder = Some(zstd::Decoder::new(file)?);
414
415 Ok(FilePlayerIterator { fr: self, decoder })
416 }
417
418 pub fn iter_from_segment_with_rn(&mut self, rn: u64) -> Result<FilePlayerIterator> {
421 if rn < self.first_rec_num {
422 return self.iter_from_segment(0);
424 }
425 let rel_rn = (rn - self.first_rec_num) as u32;
426
427 let mut seg_num = None;
429 for (i, seg) in self.segments.iter().enumerate() {
430 if seg.size == 0 {
431 break; }
433 if seg.last_rn >= rel_rn {
434 seg_num = Some(i as u32);
435 break;
436 }
437 }
438
439 if let Some(seg_num) = seg_num {
440 self.iter_from_segment(seg_num)
441 } else {
442 Ok(FilePlayerIterator {
444 fr: self,
445 decoder: None,
446 })
447 }
448 }
449
450 pub fn get_data_start_offset(&self) -> u64 {
451 get_data_start_offset(self.segments.len() as u32)
452 }
453
454 pub fn get_last_record_number(&self) -> Option<u64> {
455 self.segments
456 .iter()
457 .rev()
458 .find(|s| s.size > 0)
459 .map(|s| s.last_rn)
460 .map(|rn| rn as u64 + self.first_rec_num)
461 }
462}
463
464fn verify_consistency(segments: &[SegmentInfo]) -> Result<()> {
468 for i in 1..segments.len() {
469 if segments[i - 1].size == 0 && segments[i].size > 0 {
470 return Err(YgwError::CorruptedRecordingFile(format!(
471 "The segment {i} of size>0 is following a segment of size 0"
472 )));
473 }
474 if segments[i].size > 0 && segments[i].last_rn <= segments[i - 1].last_rn {
475 return Err(YgwError::CorruptedRecordingFile(format!(
476 "The last record numbers are not increasing"
477 )));
478 }
479 }
480 Ok(())
481}
482
483pub struct FilePlayerIterator<'b> {
484 fr: &'b FilePlayer,
485 decoder: Option<zstd::Decoder<'b, BufReader<File>>>,
486}
487
488impl<'b> Iterator for FilePlayerIterator<'b> {
489 type Item = Result<(u64, Vec<u8>)>;
490
491 fn next(&mut self) -> Option<Self::Item> {
492 match self.decoder {
493 None => None,
494 Some(ref mut decoder) => match read_record(decoder) {
495 Ok((rec_num, data)) => Some(Ok((self.fr.first_rec_num + rec_num as u64, data))),
496 Err(YgwError::IOError(_, e)) if e.kind() == std::io::ErrorKind::UnexpectedEof => {
497 self.decoder.take();
498 None
499 }
500 Err(e) => Some(Err(e)),
501 },
502 }
503 }
504}
505
506fn read_record(decoder: &mut zstd::Decoder<BufReader<File>>) -> Result<(u32, Vec<u8>)> {
507 let size = decoder.read_u32::<BigEndian>()?;
508 if size > MAX_REC_SIZE {
509 return Err(YgwError::DecodeError(format!(
510 "Found record larger than the maximum allowed ({} > {})",
511 size, MAX_REC_SIZE
512 )));
513 }
514 if size < 4 {
515 return Err(YgwError::DecodeError(format!(
516 "Invalid record size {} (expected at least 4)",
517 size
518 )));
519 }
520 let rec_num = decoder.read_u32::<BigEndian>()?;
521 let mut buf = vec![0; size as usize - 4];
522 decoder.read_exact(&mut buf)?;
523
524 Ok((rec_num, buf))
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530 use crate::Result;
531 use tempfile::tempdir;
532
533 #[test]
534 fn test1() {
535 let tmp_dir = tempdir().unwrap();
536 let path = tmp_dir.path().join("test1");
537
538 let mut recorder = FileRecorder::new(&path, 1000, 2, 2).unwrap();
539
540 let r = FilePlayer::open(&path, 300);
542 assert!(matches!(r, Err(_)));
543
544 let mut player = FilePlayer::open(&path, 1000).unwrap();
545 let mut it = player.iter().unwrap();
546 assert!(it.next().is_none());
547
548 let data = vec![1u8; 10];
549
550 recorder.append(1000, &data, false).unwrap();
551 recorder.flush().unwrap();
552
553 let mut player = FilePlayer::open(&path, 1000).unwrap();
554 assert_eq!(player.first_rec_num, 1000);
555 assert_eq!(player.segments.len(), 2);
556
557 let mut it = player.iter().unwrap();
558 check_equals(it.next(), 1000, &data);
559 assert!(it.next().is_none());
560
561 recorder.append(2000, &data, false).unwrap();
562 recorder.flush().unwrap();
563
564 let mut player = FilePlayer::open(&path, 1000).unwrap();
565 let mut it = player.iter().unwrap();
566 check_equals(it.next(), 1000, &data);
567 check_equals(it.next(), 2000, &data);
568 assert!(it.next().is_none());
569
570 recorder.append(3000, &data, false).unwrap();
571 assert_eq!(false, recorder.is_full());
572 recorder.append(3001, &data, false).unwrap();
573 assert_eq!(true, recorder.is_full());
574
575 let mut player = FilePlayer::open(&path, 1000).unwrap();
576 let mut it = player.iter().unwrap();
577 check_equals(it.next(), 1000, &data);
578 check_equals(it.next(), 2000, &data);
579 check_equals(it.next(), 3000, &data);
580 check_equals(it.next(), 3001, &data);
581 assert!(it.next().is_none());
582 }
583
584 fn check_equals(
585 actual: Option<Result<(u64, Vec<u8>)>>,
586 expected_rec_num: u64,
587 expected_data: &[u8],
588 ) {
589 if let Some(Ok((rec_num, ref data))) = actual {
590 assert_eq!(rec_num, expected_rec_num);
591 assert_eq!(data, expected_data);
592 } else {
593 panic!(
594 "did not match, expected ({expected_rec_num}, {:?}) but got {:?}",
595 expected_data, actual
596 );
597 }
598 }
599}