1use std::fs::{File, OpenOptions};
2use std::io::{self, BufRead, BufReader, Read, Seek, SeekFrom, Write};
3use std::marker::PhantomData;
4use std::path::PathBuf;
5
6use byteorder::{BigEndian, ReadBytesExt};
7use bytes::{BufMut, BytesMut};
8use errors::*;
9use protobuf::Message;
10use util;
11
12use common::FileID;
13
14const RECORD_SIZE_OFFSET: usize = 0;
18const RECORD_SIZE_CRC_OFFSET: usize = 8;
19const RECORD_DATA_OFFSET: usize = 12;
20
21#[inline(always)]
25pub fn payload_size_for_record<M>(msg: &M) -> u64
26where
27 M: protobuf::Message,
28{
29 msg.compute_size() as u64 + 16
30}
31
32pub trait RecordRead {
41 type Reader: BufRead + Seek;
42 type Message: protobuf::Message;
43
44 fn reader<'a>(&'a mut self) -> Result<&'a mut Self::Reader>;
47
48 fn read_exact_from_current_offset(&mut self, bytes: &mut [u8]) -> Result<()> {
51 self.reader()?.read_exact(bytes).map_err(|e| e.into())
52 }
53
54 fn read_exact(
58 &mut self,
59 offset_from_start: u64,
60 bytes: &mut [u8],
61 ) -> Result<()> {
62 self.reader()?.seek(SeekFrom::Start(offset_from_start))?;
63 self.read_exact_from_current_offset(bytes)
64 }
65
66 fn eof(&mut self) -> Result<bool> {
68 Ok(self.reader()?.fill_buf()?.is_empty())
69 }
70
71 fn next_record(&mut self) -> Result<Option<Self::Message>> {
74 let mut header = [0u8; RECORD_DATA_OFFSET];
75 if self.eof()? {
76 return Ok(None);
77 }
78 self
79 .read_exact_from_current_offset(&mut header[..])
80 .chain_err(|| "Error reading record size and record size CRC")?;
81 let record_size = (&header[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET])
82 .read_u64::<BigEndian>()
83 .chain_err(|| "Error reading record size from serialized header")?;
84 let record_size_crc = (&header[RECORD_SIZE_CRC_OFFSET..])
85 .read_u32::<BigEndian>()
86 .chain_err(|| "Error reading record size CRC from serialized header")?;
87 if util::checksum_crc32(&header[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET])
88 != record_size_crc
89 {
90 return Err(
91 ErrorKind::InvalidData("CRC failed for record size".to_string()).into(),
92 );
93 }
94 let mut data_and_crc = vec![0u8; (record_size + 4) as usize];
95 self
96 .read_exact_from_current_offset(&mut data_and_crc)
97 .chain_err(|| "Error reading record data")?;
98 let data_crc =
99 (&data_and_crc[record_size as usize..]).read_u32::<BigEndian>()?;
100 if util::checksum_crc32(&data_and_crc[..record_size as usize]) != data_crc {
101 return Err(
102 ErrorKind::InvalidData("CRC failed for record data".to_string()).into(),
103 );
104 }
105 protobuf::parse_from_bytes::<Self::Message>(
106 &data_and_crc[..record_size as usize],
107 )
108 .chain_err(|| "Error parsing data")
109 .map(|m| Some(m))
110 }
111
112 fn record_at_offset(
115 &mut self,
116 offset_from_start: u64,
117 ) -> Result<Option<Self::Message>> {
118 self.reader()?.seek(SeekFrom::Start(offset_from_start))?;
119 self.next_record()
120 }
121
122 fn read_record(
126 &mut self,
127 offset_from_start: u64,
128 payload_size: u64,
129 ) -> Result<Self::Message> {
130 let mut payload = vec![0u8; payload_size as usize];
131 self.read_exact(offset_from_start, &mut payload)?;
132 let mut record_size_slice =
133 &payload[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET];
134 let record_size_slice_crc = util::checksum_crc32(record_size_slice);
135 let record_size = record_size_slice
136 .read_u64::<BigEndian>()
137 .chain_err(|| format!("Error reading record size from bytes"))?;
138 let record_size_crc = (&payload
139 [RECORD_SIZE_CRC_OFFSET..RECORD_DATA_OFFSET])
140 .read_u32::<BigEndian>()
141 .chain_err(|| format!("Error reading record size CRC from bytes"))?;
142 if record_size_slice_crc != record_size_crc {
143 return Err(
144 ErrorKind::InvalidData("CRC failed for record size".to_string()).into(),
145 );
146 }
147 let record_data_crc_offset = RECORD_DATA_OFFSET + record_size as usize;
148 let record_data = &payload[RECORD_DATA_OFFSET..record_data_crc_offset];
149 let data_crc =
150 (&payload[record_data_crc_offset..]).read_u32::<BigEndian>()?;
151 if util::checksum_crc32(record_data) != data_crc {
152 return Err(
153 ErrorKind::InvalidData("CRC failed for record data".to_string()).into(),
154 );
155 }
156 protobuf::parse_from_bytes::<Self::Message>(record_data)
157 .chain_err(|| "Error reading message body from bytes")
158 .map_err(|e| e.into())
159 }
160}
161
162pub type AppendRecordResult = Result<(u64, u64)>;
165
166pub trait SyncWrite: Write {
167 fn sync(&mut self) -> std::io::Result<()>;
168 fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
169 println!("SyncWrite::write");
170 let n = Write::write(self, buf)?;
171 self.sync()?;
172 Ok(n)
173 }
174
175 fn write_all(&mut self, buf: &[u8]) -> std::io::Result<()> {
176 Write::write_all(self, buf)?;
177 Write::flush(self)?;
178 self.sync()?;
179 Ok(())
180 }
181}
182
183impl SyncWrite for File {
184 fn sync(&mut self) -> std::io::Result<()> {
185 File::sync_data(self)
186 }
187}
188
189pub trait RecordAppend {
190 type Writer: SyncWrite + Seek;
191 type Message: protobuf::Message;
192
193 fn writer<'a>(&'a mut self) -> Result<&'a mut Self::Writer>;
194
195 fn tell(&mut self) -> Result<u64> {
200 self
201 .writer()?
202 .seek(SeekFrom::Current(0))
203 .map_err(|e| e.into())
204 }
205
206 fn append_record(&mut self, record: &Self::Message) -> AppendRecordResult {
211 let offset = self.tell()?;
212 let payload_size_bytes = payload_size_for_record(record);
213 let mut payload = BytesMut::with_capacity(payload_size_bytes as usize);
214 payload.put_u64_be(record.compute_size() as u64);
215 let record_size_crc = util::checksum_crc32(&payload[..8]);
216 payload.put_u32_be(record_size_crc);
217 let record_data: Vec<u8> = record.write_to_bytes()?;
218 let record_crc = util::checksum_crc32(&record_data);
219 payload.put(record_data);
220 payload.put_u32_be(record_crc);
221 SyncWrite::write_all(self.writer()?, &payload)?;
222 Ok((offset, payload.len() as u64))
223 }
224
225 fn retreat(&mut self, bytes_to_retreat: u64) -> Result<u64> {
228 self.writer()?.flush()?;
229 self
230 .writer()?
231 .seek(SeekFrom::Current(-(bytes_to_retreat as i64)))
232 .map_err(|e| e.into())
233 }
234}
235
236#[cfg(test)]
237pub mod test_utils {
238 use super::*;
239 use bitrust_pb::BitRustDataRecord;
240 use std::io::Cursor;
241 impl SyncWrite for Cursor<Vec<u8>> {
242 fn sync(&mut self) -> std::io::Result<()> {
243 Write::flush(self)?;
244 Ok(())
245 }
246 }
247
248 pub struct CursorBasedReader<T> {
249 pub cursor: Cursor<T>,
250 }
251
252 impl<T: AsRef<[u8]>> RecordRead for CursorBasedReader<T> {
253 type Message = BitRustDataRecord;
254 type Reader = Cursor<T>;
255
256 fn reader<'a>(&'a mut self) -> Result<&'a mut Cursor<T>> {
257 Ok(&mut self.cursor)
258 }
259 }
260
261 pub struct CursorBasedWriter {
262 pub cursor: Cursor<Vec<u8>>,
263 }
264
265 impl RecordAppend for CursorBasedWriter {
266 type Message = BitRustDataRecord;
267 type Writer = Cursor<Vec<u8>>;
268
269 fn writer<'a>(&'a mut self) -> Result<&'a mut Cursor<Vec<u8>>> {
270 Ok(&mut self.cursor)
271 }
272 }
273}
274
275#[derive(Debug)]
280pub struct FileBasedRecordReader<T> {
281 reader: BufReader<File>,
282 pub path: PathBuf,
283 pub id: FileID,
284 phantom: PhantomData<T>,
285}
286
287impl<T: Message> FileBasedRecordReader<T> {
288 pub fn new(path: PathBuf) -> io::Result<FileBasedRecordReader<T>> {
289 let f = FileBasedRecordReader::<T> {
290 reader: BufReader::new(
291 OpenOptions::new().read(true).create(false).open(&path)?,
292 ),
293 id: util::file_id_from_path(&path),
294 path: path,
295 phantom: PhantomData,
296 };
297 Ok(f)
298 }
299}
300
301#[derive(Debug)]
302pub struct FileBasedRecordRW<T> {
303 writer: File,
304 reader: BufReader<File>,
305 pub path: PathBuf,
306 pub id: FileID,
307 pub w_offset: u64,
308 phantom: std::marker::PhantomData<T>,
309}
310
311impl<T: Message> FileBasedRecordRW<T> {
312 pub fn new(path: PathBuf) -> io::Result<FileBasedRecordRW<T>> {
313 let mut writer = OpenOptions::new()
314 .read(true)
315 .write(true)
316 .create(true)
317 .open(&path)?;
318 let w_offset = writer.seek(SeekFrom::End(0))?;
319 let f = FileBasedRecordRW::<T> {
320 writer,
321 reader: BufReader::new(
322 OpenOptions::new().read(true).create(false).open(&path)?,
323 ),
324 id: util::file_id_from_path(&path),
325 path: path,
326 phantom: PhantomData,
327 w_offset: w_offset,
328 };
329 debug!("Initialized active file");
330 Ok(f)
331 }
332
333 pub fn append_record(&mut self, record: &T) -> AppendRecordResult {
334 let (offset, payload_len) = RecordAppend::append_record(self, record)?;
335 self.w_offset += payload_len;
336 Ok((offset, payload_len))
337 }
338
339 pub fn retreat(&mut self, bytes_to_retreat: u64) -> Result<u64> {
340 self.w_offset = RecordAppend::retreat(self, bytes_to_retreat)?;
341 Ok(self.w_offset)
342 }
343}
344
345impl<T> Into<FileBasedRecordReader<T>> for FileBasedRecordRW<T> {
346 fn into(self) -> FileBasedRecordReader<T> {
347 FileBasedRecordReader {
348 reader: self.reader,
349 path: self.path,
350 id: self.id,
351 phantom: self.phantom,
352 }
353 }
354}
355
356impl<T: Message> RecordRead for FileBasedRecordReader<T> {
357 type Message = T;
358 type Reader = BufReader<File>;
359 fn reader<'a>(&'a mut self) -> Result<&'a mut BufReader<File>> {
360 Ok(&mut self.reader)
361 }
362}
363
364impl<T: Message> RecordRead for FileBasedRecordRW<T> {
365 type Message = T;
366 type Reader = BufReader<File>;
367 fn reader<'a>(&'a mut self) -> Result<&'a mut BufReader<File>> {
368 Ok(&mut self.reader)
369 }
370}
371
372impl<T: Message> RecordAppend for FileBasedRecordRW<T> {
373 type Message = T;
374 type Writer = File;
375 fn writer<'a>(&'a mut self) -> Result<&'a mut File> {
376 Ok(&mut self.writer)
377 }
378
379 fn tell(&mut self) -> Result<u64> {
380 Ok(self.w_offset)
381 }
382}
383
384#[cfg(test)]
385mod record_reader_tests {
386 use super::*;
387 use bitrust_pb::BitRustDataRecord;
388 extern crate simplelog;
389 extern crate tempfile;
390 use super::test_utils::*;
391 use byteorder::{BigEndian, ByteOrder};
392 use bytes::{BufMut, BytesMut};
393 use std::io::Cursor;
394
395 #[test]
396 fn test_next_record_fails_for_incomplete_data_at_end() {
397 let mut reader = CursorBasedReader {
400 cursor: Cursor::new(vec![0u8, 0u8, 1u8]),
401 };
402 match reader.next_record() {
403 Ok(record) => panic!("Expected error, but got Ok({:?})", record),
404 Err(_) => {}
405 }
406 }
407
408 #[test]
409 fn test_next_record_succeeds_with_empty_stream() {
410 let mut reader = CursorBasedReader {
411 cursor: Cursor::new(vec![]),
412 };
413 let rec = reader.next_record();
414 match rec {
415 Ok(None) => {}
416 _ => panic!("Expected Ok(None), but got {:?}", rec),
417 }
418 }
419
420 #[test]
421 fn test_next_record_succeeds_with_nonempty_stream() {
422 let mut b = BytesMut::new();
423 let mut record = BitRustDataRecord::new();
424 record.set_timestamp(42);
425 record.set_key(b"foo".to_vec());
426 record.set_value(b"bar".to_vec());
427 b.put_u64_be(record.compute_size() as u64);
428 b.put_u32_be(util::checksum_crc32(&b[..8]));
429 let record_data = record
430 .write_to_bytes()
431 .expect("Failed to write test record to bytes");
432 b.put(&record_data);
433 b.put_u32_be(util::checksum_crc32(&record_data));
434 let mut reader = CursorBasedReader {
435 cursor: Cursor::new(b),
436 };
437 let record_from_cursor = reader
438 .next_record()
439 .expect("Failed to read next record from cursor");
440 assert!(
441 record_from_cursor
442 .clone()
443 .expect("Expected a record from cursor, but found None")
444 == record,
445 format!("Expected {:?}, found {:?}", record, record_from_cursor)
446 );
447 let record_from_cursor =
448 reader.next_record().expect("Expected ok result at EOF");
449 assert!(
450 record_from_cursor.is_none(),
451 format!("Expected None, got {:?}", record_from_cursor)
452 );
453 }
454
455 #[test]
456 fn test_read_record_works() {
457 let mut rec = BitRustDataRecord::new();
458 rec.set_timestamp(42);
459 rec.set_key(b"k".to_vec());
460 rec.set_value(b"v".to_vec());
461
462 let mut expected_buf = vec![0u8; payload_size_for_record(&rec) as usize];
463 BigEndian::write_u64(
464 &mut expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
465 rec.compute_size() as u64,
466 );
467 let rec_size_crc = util::checksum_crc32(
468 &expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
469 );
470 BigEndian::write_u32(
471 &mut expected_buf[RECORD_SIZE_CRC_OFFSET..RECORD_DATA_OFFSET],
472 rec_size_crc,
473 );
474 let rec_data = rec.write_to_bytes().expect("Vector of bytes");
475 let record_data_crc_offset =
476 RECORD_DATA_OFFSET + rec.compute_size() as usize;
477 expected_buf[RECORD_DATA_OFFSET..record_data_crc_offset]
478 .copy_from_slice(&rec_data);
479 BigEndian::write_u32(
480 &mut expected_buf[record_data_crc_offset..],
481 util::checksum_crc32(&rec_data),
482 );
483
484 let mut buf_with_padding = vec![0u8; expected_buf.len() + 2];
485 buf_with_padding[1..expected_buf.len() + 1].copy_from_slice(&expected_buf);
486
487 let mut reader = CursorBasedReader {
488 cursor: Cursor::new(buf_with_padding),
489 };
490
491 let read_rec = reader
492 .read_record(1, expected_buf.len() as u64)
493 .expect("Non-error record");
494 assert!(read_rec == rec);
495 }
496
497 #[test]
498 fn test_read_record_fails_for_invalid_payload_spec() {
499 let mut rec = BitRustDataRecord::new();
500 rec.set_timestamp(42);
501 rec.set_key(b"k".to_vec());
502 rec.set_value(b"v".to_vec());
503
504 let mut expected_buf = vec![0u8; payload_size_for_record(&rec) as usize];
505 BigEndian::write_u64(
506 &mut expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
507 rec.compute_size() as u64,
508 );
509 let rec_size_crc = util::checksum_crc32(
510 &expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
511 );
512 BigEndian::write_u32(
513 &mut expected_buf[RECORD_SIZE_CRC_OFFSET..RECORD_DATA_OFFSET],
514 rec_size_crc,
515 );
516 let rec_data = rec.write_to_bytes().expect("Vector of bytes");
517 let record_data_crc_offset =
518 RECORD_DATA_OFFSET + rec.compute_size() as usize;
519 expected_buf[RECORD_DATA_OFFSET..record_data_crc_offset]
520 .copy_from_slice(&rec_data);
521 BigEndian::write_u32(
522 &mut expected_buf[record_data_crc_offset..],
523 util::checksum_crc32(&rec_data),
524 );
525
526 let mut buf_with_padding = vec![0u8; expected_buf.len() + 2];
527 buf_with_padding[1..expected_buf.len() + 1].copy_from_slice(&expected_buf);
528
529 let mut reader = CursorBasedReader {
530 cursor: Cursor::new(buf_with_padding),
531 };
532
533 let read_rec = reader.read_record(0, expected_buf.len() as u64);
534 assert!(read_rec.is_err(), "Expected error, got {:?}", read_rec);
535 }
536}
537
538#[cfg(test)]
539mod record_appender_tests {
540 use super::test_utils::*;
541 use super::*;
542 use bitrust_pb::BitRustDataRecord;
543 extern crate simplelog;
544 extern crate tempfile;
545 use byteorder::{BigEndian, ByteOrder};
546 use std::io::Cursor;
547
548 #[test]
549 fn test_retreat_works() {
550 let mut rec = BitRustDataRecord::new();
551 rec.set_timestamp(42);
552 rec.set_key(b"k".to_vec());
553 rec.set_value(b"v".to_vec());
554
555 let mut writer = CursorBasedWriter {
556 cursor: Cursor::new(vec![]),
557 };
558 let (offset, size) = writer.append_record(&rec).expect("Writing record");
559 assert!((offset, size) == (0, payload_size_for_record(&rec)));
560 writer.retreat(size).expect("Retreat should succeed");
561
562 let mut rec = BitRustDataRecord::new();
563 rec.set_timestamp(42);
564 rec.set_key(b"k2".to_vec());
565 rec.set_value(b"v2".to_vec());
566 writer
567 .append_record(&rec)
568 .expect("Writing record after retreat");
569
570 let mut reader = CursorBasedReader {
571 cursor: Cursor::new(writer.cursor.into_inner()),
572 };
573 let read_rec = reader
574 .next_record()
575 .expect("Read record")
576 .expect("Some record");
577 assert!(read_rec == rec);
578 }
579
580 #[test]
581 fn test_append_record_works() {
582 let mut rec = BitRustDataRecord::new();
583 rec.set_timestamp(42);
584 rec.set_key(b"k".to_vec());
585 rec.set_value(b"v".to_vec());
586
587 let mut writer = CursorBasedWriter {
588 cursor: Cursor::new(vec![]),
589 };
590 assert!(
591 writer
592 .append_record(&rec)
593 .expect("Writing record should succeed")
594 == (0, payload_size_for_record(&rec))
595 );
596
597 let mut expected_buf = vec![0u8; payload_size_for_record(&rec) as usize];
598 BigEndian::write_u64(
599 &mut expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
600 rec.compute_size() as u64,
601 );
602 let rec_size_crc = util::checksum_crc32(
603 &expected_buf[RECORD_SIZE_OFFSET..RECORD_SIZE_CRC_OFFSET],
604 );
605 BigEndian::write_u32(
606 &mut expected_buf[RECORD_SIZE_CRC_OFFSET..RECORD_DATA_OFFSET],
607 rec_size_crc,
608 );
609 let rec_data = rec.write_to_bytes().expect("Vector of bytes");
610 let record_data_crc_offset =
611 RECORD_DATA_OFFSET + rec.compute_size() as usize;
612 expected_buf[RECORD_DATA_OFFSET..record_data_crc_offset]
613 .copy_from_slice(&rec_data);
614 BigEndian::write_u32(
615 &mut expected_buf[record_data_crc_offset..],
616 util::checksum_crc32(&rec_data),
617 );
618 let written_buf = writer.cursor.into_inner();
619
620 assert!(
621 written_buf == expected_buf,
622 "Expected {:?}, got {:?}",
623 expected_buf,
624 written_buf
625 );
626 }
627}